You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Lukasz Cwik <lc...@google.com.INVALID> on 2017/05/18 22:44:02 UTC

Re: Beam Fn API

Now that I'm back from vacation and the 2.0.0 release is not taking all my
time, I am focusing my attention on working on the Beam Portability
framework, specifically the Fn API so that we can get Python and other
language integrations work with any runner.

For new comers, I would like to reshare the overview:
https://s.apache.org/beam-fn-api

And for those of you who have been following this thread and contributors
focusing on Runner integration with Apache Beam:
* How to process a bundle: https://s.apache.org/beam-fn-api-processing-a-
bundle
* How to send and receive data: https://s.apache.org/
beam-fn-api-send-and-receive-data

If you want to dive deeper, you should look at:
* Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
common/runner-api/src/main/proto/beam_runner_api.proto
* Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
common/fn-api/src/main/proto/beam_fn_api.proto
* Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
java/harness
* Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
python/apache_beam/runners/worker

Next I'm planning on talking about Beam Fn State API and will need help
from Runner contributors to talk about caching semantics and key spaces and
whether the integrations mesh well with current Runner implementations. The
State API is meant to support user state, side inputs, and re-iteration for
large values produced by GroupByKey.

On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:

> Yes, I was using a Pipeline that was:
> Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a batch
> pipeline in the global window using the default trigger)
>
> In Google Cloud Dataflow, the shuffle step uses the binary representation
> to compare keys, so the above pipeline would normally be converted to the
> following two stages:
> Read -> GBK Writer
> GBK Reader -> IdentityParDo
>
> Note that the GBK Writer and GBK Reader need to use a coder to encode and
> decode the value.
>
> When using the Fn API, those two stages expanded because of the Fn Api
> crossings using a gRPC Write/Read pair:
> Read -> gRPC Write -> gRPC Read -> GBK Writer
> GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
>
> In my naive prototype implementation, the coder was used to encode
> elements at the gRPC steps. This meant that the coder was
> encoding/decoding/encoding in the first stage and
> decoding/encoding/decoding in the second stage. This tripled the amount of
> times the coder was being invoked per element. This additional use of the
> coder accounted for ~12% (80% of the 15%) of the extra execution time. This
> implementation is quite inefficient and would benefit from merging the gRPC
> Read + GBK Writer into one actor and also the GBK Reader + gRPC Write into
> another actor allowing for the creation of a fast path that can skip parts
> of the decode/encode cycle through the coder. By using a byte array view
> over the logical stream, one can minimize the number of byte array copies
> which plagued my naive implementation. This can be done by only parsing the
> element boundaries out of the stream to produce those logical byte array
> views. I have a very rough estimate that performing this optimization would
> reduce the 12% overhead to somewhere between 4% and 6%.
>
> The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
> marshalling/unmarshalling protos
> handling/managing the socket
> flow control
> ...
>
> Finally, I did try experiments with different buffer sizes (10KB, 100KB,
> 1000KB), flow control (separate thread[1] vs same thread with phaser[2]),
> and channel type [3] (NIO, epoll, domain socket), but coder overhead easily
> dominated the differences in these other experiments.
>
> Further analysis would need to be done to more accurately distill this
> down.
>
> 1: https://github.com/lukecwik/incubator-beam/blob/
> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/
> BufferingStreamObserver.java
> 2: https://github.com/lukecwik/incubator-beam/blob/
> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/
> DirectStreamObserver.java
> 3: https://github.com/lukecwik/incubator-beam/blob/
> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/
> ManagedChannelFactory.java
>
>
> On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com> wrote:
>
>> Awesome job Lukasz, Excellent, I have to confess the first time I heard
>> about
>> the Fn API idea I was a bit incredulous, but you are making it real,
>> amazing!
>>
>> Just one question from your document, you said that 80% of the extra (15%)
>> time
>> goes into encoding and decoding the data for your test case, can you
>> expand
>> in
>> your current ideas to improve this? (I am not sure I completely understand
>> the
>> issue).
>>
>>
>> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik <lc...@google.com.invalid>
>> wrote:
>>
>> > Responded inline.
>> >
>> > On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
>> wrote:
>> >
>> > > This is truly amazing Luke!
>> > >
>> > > If I understand this right, the runner executing the DoFn will
>> delegate
>> > the
>> > > function code and input data (and state, coders, etc.) to the
>> container
>> > > where it will execute with the user's SDK of choice, right ?
>> >
>> >
>> > Yes, that is correct.
>> >
>> >
>> > > I wonder how the containers relate to the underlying engine's worker
>> > > processes ? is it a 1-1, container per worker ? if there's less "work"
>> > for
>> > > the worker's Java process (for example) now and it becomes a sort of
>> > > "dispatcher", would that change the resource allocation commonly used
>> for
>> > > the same Pipeline so that the worker processes would require less
>> > > resources, while giving those to the container ?
>> > >
>> >
>> > I think with the four services (control, data, state, logging) you can
>> go
>> > with a 1-1 relationship or break it up more finely grained and dedicate
>> > some machines to have specific tasks. Like you could have a few machines
>> > dedicated to log aggregation which all the workers push their logs to.
>> > Similarly, you could have some machines that have a lot of memory which
>> > would be better to be able to do shuffles in memory and then this
>> cluster
>> > of high memory machines could front the data service. I believe there
>> is a
>> > lot of flexibility based upon what a runner can do and what it
>> specializes
>> > in and believe that with more effort comes more possibilities albeit
>> with
>> > increased internal complexity.
>> >
>> > The layout of resources depends on whether the services and SDK
>> containers
>> > are co-hosted on the same machine or whether there is a different
>> > architecture in play. In a co-hosted configuration, it seems likely that
>> > the SDK container will get more resources but is dependent on the runner
>> > and pipeline shape (shuffle heavy dominated pipelines will look
>> different
>> > then ParDo dominated pipelines).
>> >
>> >
>> > > About executing sub-graphs, would it be true to say that as long as
>> > there's
>> > > no shuffle, you could keep executing in the same container ? meaning
>> that
>> > > the graph is broken into sub-graphs by shuffles ?
>> > >
>> >
>> > The only thing that is required is that the Apache Beam model is
>> preserved
>> > so typical break points will be at shuffles and language crossing points
>> > (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
>> graph
>> > even more for other reasons.
>> >
>> >
>> > > I have to dig-in deeper, so I could have more questions ;-) thanks
>> Luke!
>> > >
>> > > On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik <lcwik@google.com.invalid
>> >
>> > > wrote:
>> > >
>> > > > I updated the PR description to contain the same.
>> > > >
>> > > > I would start by looking at the API/object model definitions found
>> in
>> > > > beam_fn_api.proto
>> > > > <
>> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
>> > > sdks/common/fn-api/src/main/proto/beam_fn_api.proto
>> > > > >
>> > > >
>> > > > Then depending on your interest, look at the following:
>> > > > * FnHarness.java
>> > > > <
>> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
>> > > sdks/java/harness/src/main/java/org/apache/beam/fn/
>> > harness/FnHarness.java
>> > > > >
>> > > > is the main entry point.
>> > > > * org.apache.beam.fn.harness.data
>> > > > <
>> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
>> > > sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
>> > > > >
>> > > > contains the most interesting bits of code since it is able to
>> > multiplex
>> > > a
>> > > > gRPC stream into multiple logical streams of elements bound for
>> > multiple
>> > > > concurrent process bundle requests. It also contains the code to
>> take
>> > > > multiple logical outbound streams and multiplex them back onto a
>> gRPC
>> > > > stream.
>> > > > * org.apache.beam.runners.core
>> > > > <
>> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
>> > > sdks/java/harness/src/main/java/org/apache/beam/runners/core
>> > > > >
>> > > > contains additional runners akin to the DoFnRunner found in
>> > runners-core
>> > > to
>> > > > support sources and gRPC endpoints.
>> > > >
>> > > > Unless your really interested in how domain sockets, epoll, nio
>> channel
>> > > > factories or how stream readiness callbacks work in gRPC, I would
>> avoid
>> > > the
>> > > > packages org.apache.beam.fn.harness.channel and
>> > > > org.apache.beam.fn.harness.stream. Similarly I would avoid
>> > > > org.apache.beam.fn.harness.fn and org.apache.beam.fn.harness.fake
>> as
>> > > they
>> > > > don't add anything meaningful to the api.
>> > > >
>> > > > Code package descriptions:
>> > > >
>> > > > org.apache.beam.fn.harness.FnHarness: main entry point
>> > > > org.apache.beam.fn.harness.control: Control service client and
>> > > individual
>> > > > request handlers
>> > > > org.apache.beam.fn.harness.data: Data service client and logical
>> > stream
>> > > > multiplexing
>> > > > org.apache.beam.runners.core: Additional runners akin to the
>> DoFnRunner
>> > > > found in runners-core to support sources and gRPC endpoints
>> > > > org.apache.beam.fn.harness.logging: Logging client implementation
>> and
>> > > JUL
>> > > > logging handler adapter
>> > > > org.apache.beam.fn.harness.channel: gRPC channel management
>> > > > org.apache.beam.fn.harness.stream: gRPC stream management
>> > > > org.apache.beam.fn.harness.fn: Java 8 functional interface
>> extensions
>> > > >
>> > > >
>> > > > On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
>> > <klk@google.com.invalid
>> > > >
>> > > > wrote:
>> > > >
>> > > > > This is awesome! Any chance you could roadmap the PR for us with
>> some
>> > > > links
>> > > > > into the most interesting bits?
>> > > > >
>> > > > > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
>> > > > > robertwb@google.com.invalid> wrote:
>> > > > >
>> > > > > > Also, note that we can still support the "simple" case. For
>> > example,
>> > > > > > if the user supplies us with a jar file (as they do now) a
>> runner
>> > > > > > could launch it as a subprocesses and communicate with it via
>> this
>> > > > > > same Fn API or install it in a fixed container itself--the user
>> > > > > > doesn't *need* to know about docker or manually manage
>> containers
>> > > (and
>> > > > > > indeed the Fn API could be used in-process, cross-process,
>> > > > > > cross-container, and even cross-machine).
>> > > > > >
>> > > > > > However docker provides a nice cross-language way of specifying
>> the
>> > > > > > environment including all dependencies (especially for languages
>> > like
>> > > > > > Python or C where the equivalent of a cross-platform,
>> > self-contained
>> > > > > > jar isn't as easy to produce) and is strictly more powerful and
>> > > > > > flexible (specifically it isolates the runtime environment and
>> one
>> > > can
>> > > > > > even use it for local testing).
>> > > > > >
>> > > > > > Slicing a worker up like this without sacrificing performance
>> is an
>> > > > > > ambitious goal, but essential to the story of being able to mix
>> and
>> > > > > > match runners and SDKs arbitrarily, and I think this is a great
>> > > start.
>> > > > > >
>> > > > > >
>> > > > > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
>> > > <lcwik@google.com.invalid
>> > > > >
>> > > > > > wrote:
>> > > > > > > Your correct, a docker container is created that contains the
>> > > > execution
>> > > > > > > environment the user wants or the user re-uses an existing one
>> > > > > (allowing
>> > > > > > > for a user to embed all their code/dependencies or use a
>> > container
>> > > > that
>> > > > > > can
>> > > > > > > deploy code/dependencies on demand).
>> > > > > > > A user creates a pipeline saying which docker container they
>> want
>> > > to
>> > > > > use
>> > > > > > > (this starts to allow for multiple container definitions
>> within a
>> > > > > single
>> > > > > > > pipeline to support multiple languages, versioning, ...).
>> > > > > > > A runner would then be responsible for launching one or more
>> of
>> > > these
>> > > > > > > containers in a cluster manager of their choice (scaling up or
>> > down
>> > > > the
>> > > > > > > number of instances depending on demand/load/...).
>> > > > > > > A runner then interacts with the docker containers over the
>> gRPC
>> > > > > service
>> > > > > > > definitions to delegate processing to.
>> > > > > > >
>> > > > > > >
>> > > > > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
>> > > > jb@nanthrax.net
>> > > > > >
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > >> Hi Luke,
>> > > > > > >>
>> > > > > > >> that's really great and very promising !
>> > > > > > >>
>> > > > > > >> It's really ambitious but I like the idea. Just to clarify:
>> the
>> > > > > purpose
>> > > > > > of
>> > > > > > >> using gRPC is once the docker container is running, then we
>> can
>> > > > > > "interact"
>> > > > > > >> with the container to spread and delegate processing to the
>> > docker
>> > > > > > >> container, correct ?
>> > > > > > >> The users/devops have to setup the docker containers as
>> > > > prerequisite.
>> > > > > > >> Then, the "location" of the containers (kind of container
>> > > registry)
>> > > > is
>> > > > > > set
>> > > > > > >> via the pipeline options and used by gRPC ?
>> > > > > > >>
>> > > > > > >> Thanks Luke !
>> > > > > > >>
>> > > > > > >> Regards
>> > > > > > >> JB
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
>> > > > > > >>
>> > > > > > >>> I have been prototyping several components towards the Beam
>> > > > technical
>> > > > > > >>> vision of being able to execute an arbitrary language using
>> an
>> > > > > > arbitrary
>> > > > > > >>> runner.
>> > > > > > >>>
>> > > > > > >>> I would like to share this overview [1] of what I have been
>> > > working
>> > > > > > >>> towards. I also share this PR [2] with a proposed API,
>> service
>> > > > > > definitions
>> > > > > > >>> and partial implementation.
>> > > > > > >>>
>> > > > > > >>> 1: https://s.apache.org/beam-fn-api
>> > > > > > >>> 2: https://github.com/apache/beam/pull/1801
>> > > > > > >>>
>> > > > > > >>> Please comment on the overview within this thread, and any
>> > > specific
>> > > > > > code
>> > > > > > >>> comments on the PR directly.
>> > > > > > >>>
>> > > > > > >>> Luke
>> > > > > > >>>
>> > > > > > >>>
>> > > > > > >> --
>> > > > > > >> Jean-Baptiste Onofré
>> > > > > > >> jbonofre@apache.org
>> > > > > > >> http://blog.nanthrax.net
>> > > > > > >> Talend - http://www.talend.com
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: Beam Fn API

Posted by Robert Bradshaw <ro...@google.com.INVALID>.
Thank! Looks good. I've added some comments to the doc.

On Wed, May 31, 2017 at 7:00 AM, Etienne Chauchot <ec...@gmail.com>
wrote:

> Thanks for all these docs! They are exactly what was needed for new
> contributors as discussed in this thread
>
> https://lists.apache.org/thread.html/ac93d29424e19d57097373b
> 78f3f5bcbc701e4b51385a52a6e27b7ed@%3Cdev.beam.apache.org%3E
>
> Etienne
>
>
> Le 31/05/2017 à 11:12, Aljoscha Krettek a écrit :
>
>> Thanks for banging these out Lukasz. I’ll try and read them all this week.
>>
>> We’re also planning to add support for the Fn API to the Flink Runner so
>> that we can execute Python programs. I’m sure we’ll get some valuable
>> feedback for you while doing that.
>>
>> On 26. May 2017, at 22:49, Lukasz Cwik <lc...@google.com.INVALID> wrote:
>>>
>>> I would like to share another document about the Fn API. This document
>>> specifically discusses how to access side inputs, access remote
>>> references
>>> (e.g. large iterables for hot keys produced by a GBK), and support user
>>> state.
>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing
>>>
>>> The document does require a strong foundation in the Apache Beam model
>>> and
>>> a good understanding of the prior shared docs:
>>> * How to process a bundle: https://s.apache.org/beam-fn-api
>>> -processing-a-bundle
>>> * How to send and receive data: https://s.apache.org/beam-fn-api
>>> -send-and-receive-data
>>>
>>> I could really use the help of runner contributors to review the caching
>>> semantics within the SDK harness and whether they would work well for the
>>> runner they contribute to the most.
>>>
>>> On Sun, May 21, 2017 at 6:40 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>> Manu, the goal is to share here initially, update the docs addressing
>>>> people's comments, and then publish them on the website once they are
>>>> stable enough.
>>>>
>>>> On Sun, May 21, 2017 at 5:54 PM, Manu Zhang <ow...@gmail.com>
>>>> wrote:
>>>>
>>>> Thanks Lukasz. The following two links were somehow incorrectly
>>>>> formatted
>>>>> in your mail.
>>>>>
>>>>> * How to process a bundle:
>>>>> https://s.apache.org/beam-fn-api-processing-a-bundle
>>>>> * How to send and receive data:
>>>>> https://s.apache.org/beam-fn-api-send-and-receive-data
>>>>>
>>>>> By the way, is there a way to find them from the Beam website ?
>>>>>
>>>>>
>>>>> On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
>>>>> wrote:
>>>>>
>>>>> Now that I'm back from vacation and the 2.0.0 release is not taking all
>>>>>>
>>>>> my
>>>>>
>>>>>> time, I am focusing my attention on working on the Beam Portability
>>>>>> framework, specifically the Fn API so that we can get Python and other
>>>>>> language integrations work with any runner.
>>>>>>
>>>>>> For new comers, I would like to reshare the overview:
>>>>>> https://s.apache.org/beam-fn-api
>>>>>>
>>>>>> And for those of you who have been following this thread and
>>>>>>
>>>>> contributors
>>>>>
>>>>>> focusing on Runner integration with Apache Beam:
>>>>>> * How to process a bundle: https://s.apache.org/beam-fn-a
>>>>>>
>>>>> pi-processing-a-
>>>>>
>>>>>> bundle
>>>>>> * How to send and receive data: https://s.apache.org/
>>>>>> beam-fn-api-send-and-receive-data
>>>>>>
>>>>>> If you want to dive deeper, you should look at:
>>>>>> * Runner API Protobuf: https://github.com/apache/beam
>>>>>> /blob/master/sdks/
>>>>>> common/runner-api/src/main/proto/beam_runner_api.proto
>>>>>> <https://github.com/apache/beam/blob/master/sdks/common/runn
>>>>>>
>>>>> er-api/src/main/proto/beam_runner_api.proto>
>>>>>
>>>>>> * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>>>>>> common/fn-api/src/main/proto/beam_fn_api.proto
>>>>>> <https://github.com/apache/beam/blob/master/sdks/common/fn-
>>>>>>
>>>>> api/src/main/proto/beam_fn_api.proto>
>>>>>
>>>>>> * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>>>>>> java/harness
>>>>>> <https://github.com/apache/beam/tree/master/sdks/java/harness>
>>>>>> * Python SDK Harness: https://github.com/apache/beam
>>>>>> /tree/master/sdks/
>>>>>> python/apache_beam/runners/worker
>>>>>> <https://github.com/apache/beam/tree/master/sdks/python/apac
>>>>>>
>>>>> he_beam/runners/worker>
>>>>>
>>>>>> Next I'm planning on talking about Beam Fn State API and will need
>>>>>> help
>>>>>> from Runner contributors to talk about caching semantics and key
>>>>>> spaces
>>>>>>
>>>>> and
>>>>>
>>>>>> whether the integrations mesh well with current Runner
>>>>>> implementations.
>>>>>>
>>>>> The
>>>>>
>>>>>> State API is meant to support user state, side inputs, and
>>>>>> re-iteration
>>>>>>
>>>>> for
>>>>>
>>>>>> large values produced by GroupByKey.
>>>>>>
>>>>>> On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>> Yes, I was using a Pipeline that was:
>>>>>>> Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a
>>>>>>>
>>>>>> batch
>>>>>
>>>>>> pipeline in the global window using the default trigger)
>>>>>>>
>>>>>>> In Google Cloud Dataflow, the shuffle step uses the binary
>>>>>>>
>>>>>> representation
>>>>>
>>>>>> to compare keys, so the above pipeline would normally be converted to
>>>>>>>
>>>>>> the
>>>>>
>>>>>> following two stages:
>>>>>>> Read -> GBK Writer
>>>>>>> GBK Reader -> IdentityParDo
>>>>>>>
>>>>>>> Note that the GBK Writer and GBK Reader need to use a coder to encode
>>>>>>>
>>>>>> and
>>>>>
>>>>>> decode the value.
>>>>>>>
>>>>>>> When using the Fn API, those two stages expanded because of the Fn
>>>>>>> Api
>>>>>>> crossings using a gRPC Write/Read pair:
>>>>>>> Read -> gRPC Write -> gRPC Read -> GBK Writer
>>>>>>> GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
>>>>>>>
>>>>>>> In my naive prototype implementation, the coder was used to encode
>>>>>>> elements at the gRPC steps. This meant that the coder was
>>>>>>> encoding/decoding/encoding in the first stage and
>>>>>>> decoding/encoding/decoding in the second stage. This tripled the
>>>>>>>
>>>>>> amount
>>>>>
>>>>>> of
>>>>>>
>>>>>>> times the coder was being invoked per element. This additional use of
>>>>>>>
>>>>>> the
>>>>>
>>>>>> coder accounted for ~12% (80% of the 15%) of the extra execution time.
>>>>>>>
>>>>>> This
>>>>>>
>>>>>>> implementation is quite inefficient and would benefit from merging
>>>>>>> the
>>>>>>>
>>>>>> gRPC
>>>>>>
>>>>>>> Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
>>>>>>>
>>>>>> into
>>>>>>
>>>>>>> another actor allowing for the creation of a fast path that can skip
>>>>>>>
>>>>>> parts
>>>>>>
>>>>>>> of the decode/encode cycle through the coder. By using a byte array
>>>>>>>
>>>>>> view
>>>>>
>>>>>> over the logical stream, one can minimize the number of byte array
>>>>>>>
>>>>>> copies
>>>>>
>>>>>> which plagued my naive implementation. This can be done by only
>>>>>>>
>>>>>> parsing
>>>>>
>>>>>> the
>>>>>>
>>>>>>> element boundaries out of the stream to produce those logical byte
>>>>>>>
>>>>>> array
>>>>>
>>>>>> views. I have a very rough estimate that performing this optimization
>>>>>>>
>>>>>> would
>>>>>>
>>>>>>> reduce the 12% overhead to somewhere between 4% and 6%.
>>>>>>>
>>>>>>> The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
>>>>>>> marshalling/unmarshalling protos
>>>>>>> handling/managing the socket
>>>>>>> flow control
>>>>>>> ...
>>>>>>>
>>>>>>> Finally, I did try experiments with different buffer sizes (10KB,
>>>>>>>
>>>>>> 100KB,
>>>>>
>>>>>> 1000KB), flow control (separate thread[1] vs same thread with
>>>>>>>
>>>>>> phaser[2]),
>>>>>
>>>>>> and channel type [3] (NIO, epoll, domain socket), but coder overhead
>>>>>>>
>>>>>> easily
>>>>>>
>>>>>>> dominated the differences in these other experiments.
>>>>>>>
>>>>>>> Further analysis would need to be done to more accurately distill
>>>>>>> this
>>>>>>> down.
>>>>>>>
>>>>>>> 1: https://github.com/lukecwik/incubator-beam/blob/
>>>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>>>>>>
>>>>>> rness/stream/
>>>>>
>>>>>> BufferingStreamObserver.java
>>>>>>> 2: https://github.com/lukecwik/incubator-beam/blob/
>>>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>>>>>>
>>>>>> rness/stream/
>>>>>
>>>>>> DirectStreamObserver.java
>>>>>>> 3: https://github.com/lukecwik/incubator-beam/blob/
>>>>>>>
>>>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>>>>>
>>>>> rness/channel/
>>>>>
>>>>>> ManagedChannelFactory.java
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com>
>>>>>>>
>>>>>> wrote:
>>>>>
>>>>>> Awesome job Lukasz, Excellent, I have to confess the first time I
>>>>>>>>
>>>>>>> heard
>>>>>
>>>>>> about
>>>>>>>> the Fn API idea I was a bit incredulous, but you are making it real,
>>>>>>>> amazing!
>>>>>>>>
>>>>>>>> Just one question from your document, you said that 80% of the extra
>>>>>>>>
>>>>>>> (15%)
>>>>>>
>>>>>>> time
>>>>>>>> goes into encoding and decoding the data for your test case, can you
>>>>>>>> expand
>>>>>>>> in
>>>>>>>> your current ideas to improve this? (I am not sure I completely
>>>>>>>>
>>>>>>> understand
>>>>>>
>>>>>>> the
>>>>>>>> issue).
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik
>>>>>>>>
>>>>>>> <lc...@google.com.invalid>
>>>>>
>>>>>> wrote:
>>>>>>>>
>>>>>>>> Responded inline.
>>>>>>>>>
>>>>>>>>> On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
>>>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> This is truly amazing Luke!
>>>>>>>>>>
>>>>>>>>>> If I understand this right, the runner executing the DoFn will
>>>>>>>>>>
>>>>>>>>> delegate
>>>>>>>>
>>>>>>>>> the
>>>>>>>>>
>>>>>>>>>> function code and input data (and state, coders, etc.) to the
>>>>>>>>>>
>>>>>>>>> container
>>>>>>>>
>>>>>>>>> where it will execute with the user's SDK of choice, right ?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yes, that is correct.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I wonder how the containers relate to the underlying engine's
>>>>>>>>>>
>>>>>>>>> worker
>>>>>
>>>>>> processes ? is it a 1-1, container per worker ? if there's less
>>>>>>>>>>
>>>>>>>>> "work"
>>>>>>
>>>>>>> for
>>>>>>>>>
>>>>>>>>>> the worker's Java process (for example) now and it becomes a
>>>>>>>>>>
>>>>>>>>> sort of
>>>>>
>>>>>> "dispatcher", would that change the resource allocation commonly
>>>>>>>>>>
>>>>>>>>> used
>>>>>>
>>>>>>> for
>>>>>>>>
>>>>>>>>> the same Pipeline so that the worker processes would require less
>>>>>>>>>> resources, while giving those to the container ?
>>>>>>>>>>
>>>>>>>>>> I think with the four services (control, data, state, logging) you
>>>>>>>>>
>>>>>>>> can
>>>>>
>>>>>> go
>>>>>>>>
>>>>>>>>> with a 1-1 relationship or break it up more finely grained and
>>>>>>>>>
>>>>>>>> dedicate
>>>>>>
>>>>>>> some machines to have specific tasks. Like you could have a few
>>>>>>>>>
>>>>>>>> machines
>>>>>>
>>>>>>> dedicated to log aggregation which all the workers push their logs
>>>>>>>>>
>>>>>>>> to.
>>>>>
>>>>>> Similarly, you could have some machines that have a lot of memory
>>>>>>>>>
>>>>>>>> which
>>>>>>
>>>>>>> would be better to be able to do shuffles in memory and then this
>>>>>>>>>
>>>>>>>> cluster
>>>>>>>>
>>>>>>>>> of high memory machines could front the data service. I believe
>>>>>>>>>
>>>>>>>> there
>>>>>
>>>>>> is a
>>>>>>>>
>>>>>>>>> lot of flexibility based upon what a runner can do and what it
>>>>>>>>>
>>>>>>>> specializes
>>>>>>>>
>>>>>>>>> in and believe that with more effort comes more possibilities
>>>>>>>>>
>>>>>>>> albeit
>>>>>
>>>>>> with
>>>>>>>>
>>>>>>>>> increased internal complexity.
>>>>>>>>>
>>>>>>>>> The layout of resources depends on whether the services and SDK
>>>>>>>>>
>>>>>>>> containers
>>>>>>>>
>>>>>>>>> are co-hosted on the same machine or whether there is a different
>>>>>>>>> architecture in play. In a co-hosted configuration, it seems likely
>>>>>>>>>
>>>>>>>> that
>>>>>>
>>>>>>> the SDK container will get more resources but is dependent on the
>>>>>>>>>
>>>>>>>> runner
>>>>>>
>>>>>>> and pipeline shape (shuffle heavy dominated pipelines will look
>>>>>>>>>
>>>>>>>> different
>>>>>>>>
>>>>>>>>> then ParDo dominated pipelines).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> About executing sub-graphs, would it be true to say that as long
>>>>>>>>>>
>>>>>>>>> as
>>>>>
>>>>>> there's
>>>>>>>>>
>>>>>>>>>> no shuffle, you could keep executing in the same container ?
>>>>>>>>>>
>>>>>>>>> meaning
>>>>>
>>>>>> that
>>>>>>>>
>>>>>>>>> the graph is broken into sub-graphs by shuffles ?
>>>>>>>>>>
>>>>>>>>>> The only thing that is required is that the Apache Beam model is
>>>>>>>>>
>>>>>>>> preserved
>>>>>>>>
>>>>>>>>> so typical break points will be at shuffles and language crossing
>>>>>>>>>
>>>>>>>> points
>>>>>>
>>>>>>> (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
>>>>>>>>>
>>>>>>>> graph
>>>>>>>>
>>>>>>>>> even more for other reasons.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I have to dig-in deeper, so I could have more questions ;-)
>>>>>>>>>>
>>>>>>>>> thanks
>>>>>
>>>>>> Luke!
>>>>>>>>
>>>>>>>>> On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
>>>>>>>>>>
>>>>>>>>> <lcwik@google.com.invalid
>>>>>>
>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> I updated the PR description to contain the same.
>>>>>>>>>>>
>>>>>>>>>>> I would start by looking at the API/object model definitions
>>>>>>>>>>>
>>>>>>>>>> found
>>>>>
>>>>>> in
>>>>>>>>
>>>>>>>>> beam_fn_api.proto
>>>>>>>>>>> <
>>>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>>>>>
>>>>>>>>>> sdks/common/fn-api/src/main/proto/beam_fn_api.proto
>>>>>>>>>>
>>>>>>>>>>> Then depending on your interest, look at the following:
>>>>>>>>>>> * FnHarness.java
>>>>>>>>>>> <
>>>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>>>>>
>>>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/
>>>>>>>>>>
>>>>>>>>> harness/FnHarness.java
>>>>>>>>>
>>>>>>>>>> is the main entry point.
>>>>>>>>>>> * org.apache.beam.fn.harness.data
>>>>>>>>>>> <
>>>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>>>>>
>>>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
>>>>>>>>>>
>>>>>>>>>>> contains the most interesting bits of code since it is able to
>>>>>>>>>>>
>>>>>>>>>> multiplex
>>>>>>>>>
>>>>>>>>>> a
>>>>>>>>>>
>>>>>>>>>>> gRPC stream into multiple logical streams of elements bound for
>>>>>>>>>>>
>>>>>>>>>> multiple
>>>>>>>>>
>>>>>>>>>> concurrent process bundle requests. It also contains the code
>>>>>>>>>>>
>>>>>>>>>> to
>>>>>
>>>>>> take
>>>>>>>>
>>>>>>>>> multiple logical outbound streams and multiplex them back onto
>>>>>>>>>>>
>>>>>>>>>> a
>>>>>
>>>>>> gRPC
>>>>>>>>
>>>>>>>>> stream.
>>>>>>>>>>> * org.apache.beam.runners.core
>>>>>>>>>>> <
>>>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>>>>>
>>>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/runners/core
>>>>>>>>>>
>>>>>>>>>>> contains additional runners akin to the DoFnRunner found in
>>>>>>>>>>>
>>>>>>>>>> runners-core
>>>>>>>>>
>>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>>>> support sources and gRPC endpoints.
>>>>>>>>>>>
>>>>>>>>>>> Unless your really interested in how domain sockets, epoll, nio
>>>>>>>>>>>
>>>>>>>>>> channel
>>>>>>>>
>>>>>>>>> factories or how stream readiness callbacks work in gRPC, I
>>>>>>>>>>>
>>>>>>>>>> would
>>>>>
>>>>>> avoid
>>>>>>>>
>>>>>>>>> the
>>>>>>>>>>
>>>>>>>>>>> packages org.apache.beam.fn.harness.channel and
>>>>>>>>>>> org.apache.beam.fn.harness.stream. Similarly I would avoid
>>>>>>>>>>> org.apache.beam.fn.harness.fn and
>>>>>>>>>>>
>>>>>>>>>> org.apache.beam.fn.harness.fake
>>>>>
>>>>>> as
>>>>>>>>
>>>>>>>>> they
>>>>>>>>>>
>>>>>>>>>>> don't add anything meaningful to the api.
>>>>>>>>>>>
>>>>>>>>>>> Code package descriptions:
>>>>>>>>>>>
>>>>>>>>>>> org.apache.beam.fn.harness.FnHarness: main entry point
>>>>>>>>>>> org.apache.beam.fn.harness.control: Control service client and
>>>>>>>>>>>
>>>>>>>>>> individual
>>>>>>>>>>
>>>>>>>>>>> request handlers
>>>>>>>>>>> org.apache.beam.fn.harness.data: Data service client and
>>>>>>>>>>>
>>>>>>>>>> logical
>>>>>
>>>>>> stream
>>>>>>>>>
>>>>>>>>>> multiplexing
>>>>>>>>>>> org.apache.beam.runners.core: Additional runners akin to the
>>>>>>>>>>>
>>>>>>>>>> DoFnRunner
>>>>>>>>
>>>>>>>>> found in runners-core to support sources and gRPC endpoints
>>>>>>>>>>> org.apache.beam.fn.harness.logging: Logging client
>>>>>>>>>>>
>>>>>>>>>> implementation
>>>>>
>>>>>> and
>>>>>>>>
>>>>>>>>> JUL
>>>>>>>>>>
>>>>>>>>>>> logging handler adapter
>>>>>>>>>>> org.apache.beam.fn.harness.channel: gRPC channel management
>>>>>>>>>>> org.apache.beam.fn.harness.stream: gRPC stream management
>>>>>>>>>>> org.apache.beam.fn.harness.fn: Java 8 functional interface
>>>>>>>>>>>
>>>>>>>>>> extensions
>>>>>>>>
>>>>>>>>>
>>>>>>>>>>> On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
>>>>>>>>>>>
>>>>>>>>>> <klk@google.com.invalid
>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> This is awesome! Any chance you could roadmap the PR for us
>>>>>>>>>>>>
>>>>>>>>>>> with
>>>>>
>>>>>> some
>>>>>>>>
>>>>>>>>> links
>>>>>>>>>>>
>>>>>>>>>>>> into the most interesting bits?
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
>>>>>>>>>>>> robertwb@google.com.invalid> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Also, note that we can still support the "simple" case. For
>>>>>>>>>>>>>
>>>>>>>>>>>> example,
>>>>>>>>>
>>>>>>>>>> if the user supplies us with a jar file (as they do now) a
>>>>>>>>>>>>>
>>>>>>>>>>>> runner
>>>>>>>>
>>>>>>>>> could launch it as a subprocesses and communicate with it
>>>>>>>>>>>>>
>>>>>>>>>>>> via
>>>>>
>>>>>> this
>>>>>>>>
>>>>>>>>> same Fn API or install it in a fixed container itself--the
>>>>>>>>>>>>>
>>>>>>>>>>>> user
>>>>>>
>>>>>>> doesn't *need* to know about docker or manually manage
>>>>>>>>>>>>>
>>>>>>>>>>>> containers
>>>>>>>>
>>>>>>>>> (and
>>>>>>>>>>
>>>>>>>>>>> indeed the Fn API could be used in-process, cross-process,
>>>>>>>>>>>>> cross-container, and even cross-machine).
>>>>>>>>>>>>>
>>>>>>>>>>>>> However docker provides a nice cross-language way of
>>>>>>>>>>>>>
>>>>>>>>>>>> specifying
>>>>>>
>>>>>>> the
>>>>>>>>
>>>>>>>>> environment including all dependencies (especially for
>>>>>>>>>>>>>
>>>>>>>>>>>> languages
>>>>>>
>>>>>>> like
>>>>>>>>>
>>>>>>>>>> Python or C where the equivalent of a cross-platform,
>>>>>>>>>>>>>
>>>>>>>>>>>> self-contained
>>>>>>>>>
>>>>>>>>>> jar isn't as easy to produce) and is strictly more powerful
>>>>>>>>>>>>>
>>>>>>>>>>>> and
>>>>>>
>>>>>>> flexible (specifically it isolates the runtime environment
>>>>>>>>>>>>>
>>>>>>>>>>>> and
>>>>>
>>>>>> one
>>>>>>>>
>>>>>>>>> can
>>>>>>>>>>
>>>>>>>>>>> even use it for local testing).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Slicing a worker up like this without sacrificing
>>>>>>>>>>>>>
>>>>>>>>>>>> performance
>>>>>
>>>>>> is an
>>>>>>>>
>>>>>>>>> ambitious goal, but essential to the story of being able to
>>>>>>>>>>>>>
>>>>>>>>>>>> mix
>>>>>>
>>>>>>> and
>>>>>>>>
>>>>>>>>> match runners and SDKs arbitrarily, and I think this is a
>>>>>>>>>>>>>
>>>>>>>>>>>> great
>>>>>>
>>>>>>> start.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
>>>>>>>>>>>>>
>>>>>>>>>>>> <lcwik@google.com.invalid
>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Your correct, a docker container is created that contains
>>>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>>
>>>>>>> execution
>>>>>>>>>>>
>>>>>>>>>>>> environment the user wants or the user re-uses an
>>>>>>>>>>>>>>
>>>>>>>>>>>>> existing
>>>>>
>>>>>> one
>>>>>>
>>>>>>> (allowing
>>>>>>>>>>>>
>>>>>>>>>>>>> for a user to embed all their code/dependencies or use a
>>>>>>>>>>>>>>
>>>>>>>>>>>>> container
>>>>>>>>>
>>>>>>>>>> that
>>>>>>>>>>>
>>>>>>>>>>>> can
>>>>>>>>>>>>>
>>>>>>>>>>>>>> deploy code/dependencies on demand).
>>>>>>>>>>>>>> A user creates a pipeline saying which docker container
>>>>>>>>>>>>>>
>>>>>>>>>>>>> they
>>>>>
>>>>>> want
>>>>>>>>
>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>>>> use
>>>>>>>>>>>>
>>>>>>>>>>>>> (this starts to allow for multiple container definitions
>>>>>>>>>>>>>>
>>>>>>>>>>>>> within a
>>>>>>>>
>>>>>>>>> single
>>>>>>>>>>>>
>>>>>>>>>>>>> pipeline to support multiple languages, versioning, ...).
>>>>>>>>>>>>>> A runner would then be responsible for launching one or
>>>>>>>>>>>>>>
>>>>>>>>>>>>> more
>>>>>
>>>>>> of
>>>>>>>>
>>>>>>>>> these
>>>>>>>>>>
>>>>>>>>>>> containers in a cluster manager of their choice (scaling
>>>>>>>>>>>>>>
>>>>>>>>>>>>> up
>>>>>
>>>>>> or
>>>>>>
>>>>>>> down
>>>>>>>>>
>>>>>>>>>> the
>>>>>>>>>>>
>>>>>>>>>>>> number of instances depending on demand/load/...).
>>>>>>>>>>>>>> A runner then interacts with the docker containers over
>>>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>
>>>>>> gRPC
>>>>>>>>
>>>>>>>>> service
>>>>>>>>>>>>
>>>>>>>>>>>>> definitions to delegate processing to.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
>>>>>>>>>>>>>>
>>>>>>>>>>>>> jb@nanthrax.net
>>>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Luke,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> that's really great and very promising !
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It's really ambitious but I like the idea. Just to
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> clarify:
>>>>>
>>>>>> the
>>>>>>>>
>>>>>>>>> purpose
>>>>>>>>>>>>
>>>>>>>>>>>>> of
>>>>>>>>>>>>>
>>>>>>>>>>>>>> using gRPC is once the docker container is running,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> then we
>>>>>
>>>>>> can
>>>>>>>>
>>>>>>>>> "interact"
>>>>>>>>>>>>>
>>>>>>>>>>>>>> with the container to spread and delegate processing to
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>
>>>>>> docker
>>>>>>>>>
>>>>>>>>>> container, correct ?
>>>>>>>>>>>>>>> The users/devops have to setup the docker containers as
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> prerequisite.
>>>>>>>>>>>
>>>>>>>>>>>> Then, the "location" of the containers (kind of
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> container
>>>>>
>>>>>> registry)
>>>>>>>>>>
>>>>>>>>>>> is
>>>>>>>>>>>
>>>>>>>>>>>> set
>>>>>>>>>>>>>
>>>>>>>>>>>>>> via the pipeline options and used by gRPC ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks Luke !
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have been prototyping several components towards the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Beam
>>>>>>
>>>>>>> technical
>>>>>>>>>>>
>>>>>>>>>>>> vision of being able to execute an arbitrary language
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> using
>>>>>>
>>>>>>> an
>>>>>>>>
>>>>>>>>> arbitrary
>>>>>>>>>>>>>
>>>>>>>>>>>>>> runner.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I would like to share this overview [1] of what I have
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> been
>>>>>>
>>>>>>> working
>>>>>>>>>>
>>>>>>>>>>> towards. I also share this PR [2] with a proposed API,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> service
>>>>>>>>
>>>>>>>>> definitions
>>>>>>>>>>>>>
>>>>>>>>>>>>>> and partial implementation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1: https://s.apache.org/beam-fn-api
>>>>>>>>>>>>>>>> 2: https://github.com/apache/beam/pull/1801
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Please comment on the overview within this thread, and
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> any
>>>>>
>>>>>> specific
>>>>>>>>>>
>>>>>>>>>>> code
>>>>>>>>>>>>>
>>>>>>>>>>>>>> comments on the PR directly.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Luke
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>>>>> jbonofre@apache.org
>>>>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>
>>>>
>

Re: Beam Fn API

Posted by Etienne Chauchot <ec...@gmail.com>.
Thanks for all these docs! They are exactly what was needed for new 
contributors as discussed in this thread

https://lists.apache.org/thread.html/ac93d29424e19d57097373b78f3f5bcbc701e4b51385a52a6e27b7ed@%3Cdev.beam.apache.org%3E

Etienne

Le 31/05/2017 à 11:12, Aljoscha Krettek a écrit :
> Thanks for banging these out Lukasz. I’ll try and read them all this week.
>
> We’re also planning to add support for the Fn API to the Flink Runner so that we can execute Python programs. I’m sure we’ll get some valuable feedback for you while doing that.
>
>> On 26. May 2017, at 22:49, Lukasz Cwik <lc...@google.com.INVALID> wrote:
>>
>> I would like to share another document about the Fn API. This document
>> specifically discusses how to access side inputs, access remote references
>> (e.g. large iterables for hot keys produced by a GBK), and support user
>> state.
>> https://s.apache.org/beam-fn-state-api-and-bundle-processing
>>
>> The document does require a strong foundation in the Apache Beam model and
>> a good understanding of the prior shared docs:
>> * How to process a bundle: https://s.apache.org/beam-fn-api
>> -processing-a-bundle
>> * How to send and receive data: https://s.apache.org/beam-fn-api
>> -send-and-receive-data
>>
>> I could really use the help of runner contributors to review the caching
>> semantics within the SDK harness and whether they would work well for the
>> runner they contribute to the most.
>>
>> On Sun, May 21, 2017 at 6:40 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Manu, the goal is to share here initially, update the docs addressing
>>> people's comments, and then publish them on the website once they are
>>> stable enough.
>>>
>>> On Sun, May 21, 2017 at 5:54 PM, Manu Zhang <ow...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Lukasz. The following two links were somehow incorrectly formatted
>>>> in your mail.
>>>>
>>>> * How to process a bundle:
>>>> https://s.apache.org/beam-fn-api-processing-a-bundle
>>>> * How to send and receive data:
>>>> https://s.apache.org/beam-fn-api-send-and-receive-data
>>>>
>>>> By the way, is there a way to find them from the Beam website ?
>>>>
>>>>
>>>> On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
>>>> wrote:
>>>>
>>>>> Now that I'm back from vacation and the 2.0.0 release is not taking all
>>>> my
>>>>> time, I am focusing my attention on working on the Beam Portability
>>>>> framework, specifically the Fn API so that we can get Python and other
>>>>> language integrations work with any runner.
>>>>>
>>>>> For new comers, I would like to reshare the overview:
>>>>> https://s.apache.org/beam-fn-api
>>>>>
>>>>> And for those of you who have been following this thread and
>>>> contributors
>>>>> focusing on Runner integration with Apache Beam:
>>>>> * How to process a bundle: https://s.apache.org/beam-fn-a
>>>> pi-processing-a-
>>>>> bundle
>>>>> * How to send and receive data: https://s.apache.org/
>>>>> beam-fn-api-send-and-receive-data
>>>>>
>>>>> If you want to dive deeper, you should look at:
>>>>> * Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>>>>> common/runner-api/src/main/proto/beam_runner_api.proto
>>>>> <https://github.com/apache/beam/blob/master/sdks/common/runn
>>>> er-api/src/main/proto/beam_runner_api.proto>
>>>>> * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>>>>> common/fn-api/src/main/proto/beam_fn_api.proto
>>>>> <https://github.com/apache/beam/blob/master/sdks/common/fn-
>>>> api/src/main/proto/beam_fn_api.proto>
>>>>> * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>>>>> java/harness
>>>>> <https://github.com/apache/beam/tree/master/sdks/java/harness>
>>>>> * Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>>>>> python/apache_beam/runners/worker
>>>>> <https://github.com/apache/beam/tree/master/sdks/python/apac
>>>> he_beam/runners/worker>
>>>>> Next I'm planning on talking about Beam Fn State API and will need help
>>>>> from Runner contributors to talk about caching semantics and key spaces
>>>> and
>>>>> whether the integrations mesh well with current Runner implementations.
>>>> The
>>>>> State API is meant to support user state, side inputs, and re-iteration
>>>> for
>>>>> large values produced by GroupByKey.
>>>>>
>>>>> On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Yes, I was using a Pipeline that was:
>>>>>> Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a
>>>> batch
>>>>>> pipeline in the global window using the default trigger)
>>>>>>
>>>>>> In Google Cloud Dataflow, the shuffle step uses the binary
>>>> representation
>>>>>> to compare keys, so the above pipeline would normally be converted to
>>>> the
>>>>>> following two stages:
>>>>>> Read -> GBK Writer
>>>>>> GBK Reader -> IdentityParDo
>>>>>>
>>>>>> Note that the GBK Writer and GBK Reader need to use a coder to encode
>>>> and
>>>>>> decode the value.
>>>>>>
>>>>>> When using the Fn API, those two stages expanded because of the Fn Api
>>>>>> crossings using a gRPC Write/Read pair:
>>>>>> Read -> gRPC Write -> gRPC Read -> GBK Writer
>>>>>> GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
>>>>>>
>>>>>> In my naive prototype implementation, the coder was used to encode
>>>>>> elements at the gRPC steps. This meant that the coder was
>>>>>> encoding/decoding/encoding in the first stage and
>>>>>> decoding/encoding/decoding in the second stage. This tripled the
>>>> amount
>>>>> of
>>>>>> times the coder was being invoked per element. This additional use of
>>>> the
>>>>>> coder accounted for ~12% (80% of the 15%) of the extra execution time.
>>>>> This
>>>>>> implementation is quite inefficient and would benefit from merging the
>>>>> gRPC
>>>>>> Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
>>>>> into
>>>>>> another actor allowing for the creation of a fast path that can skip
>>>>> parts
>>>>>> of the decode/encode cycle through the coder. By using a byte array
>>>> view
>>>>>> over the logical stream, one can minimize the number of byte array
>>>> copies
>>>>>> which plagued my naive implementation. This can be done by only
>>>> parsing
>>>>> the
>>>>>> element boundaries out of the stream to produce those logical byte
>>>> array
>>>>>> views. I have a very rough estimate that performing this optimization
>>>>> would
>>>>>> reduce the 12% overhead to somewhere between 4% and 6%.
>>>>>>
>>>>>> The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
>>>>>> marshalling/unmarshalling protos
>>>>>> handling/managing the socket
>>>>>> flow control
>>>>>> ...
>>>>>>
>>>>>> Finally, I did try experiments with different buffer sizes (10KB,
>>>> 100KB,
>>>>>> 1000KB), flow control (separate thread[1] vs same thread with
>>>> phaser[2]),
>>>>>> and channel type [3] (NIO, epoll, domain socket), but coder overhead
>>>>> easily
>>>>>> dominated the differences in these other experiments.
>>>>>>
>>>>>> Further analysis would need to be done to more accurately distill this
>>>>>> down.
>>>>>>
>>>>>> 1: https://github.com/lukecwik/incubator-beam/blob/
>>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>>> rness/stream/
>>>>>> BufferingStreamObserver.java
>>>>>> 2: https://github.com/lukecwik/incubator-beam/blob/
>>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>>> rness/stream/
>>>>>> DirectStreamObserver.java
>>>>>> 3: https://github.com/lukecwik/incubator-beam/blob/
>>>>>>
>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>>> rness/channel/
>>>>>> ManagedChannelFactory.java
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com>
>>>> wrote:
>>>>>>> Awesome job Lukasz, Excellent, I have to confess the first time I
>>>> heard
>>>>>>> about
>>>>>>> the Fn API idea I was a bit incredulous, but you are making it real,
>>>>>>> amazing!
>>>>>>>
>>>>>>> Just one question from your document, you said that 80% of the extra
>>>>> (15%)
>>>>>>> time
>>>>>>> goes into encoding and decoding the data for your test case, can you
>>>>>>> expand
>>>>>>> in
>>>>>>> your current ideas to improve this? (I am not sure I completely
>>>>> understand
>>>>>>> the
>>>>>>> issue).
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik
>>>> <lc...@google.com.invalid>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Responded inline.
>>>>>>>>
>>>>>>>> On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
>>>>>>> wrote:
>>>>>>>>> This is truly amazing Luke!
>>>>>>>>>
>>>>>>>>> If I understand this right, the runner executing the DoFn will
>>>>>>> delegate
>>>>>>>> the
>>>>>>>>> function code and input data (and state, coders, etc.) to the
>>>>>>> container
>>>>>>>>> where it will execute with the user's SDK of choice, right ?
>>>>>>>>
>>>>>>>> Yes, that is correct.
>>>>>>>>
>>>>>>>>
>>>>>>>>> I wonder how the containers relate to the underlying engine's
>>>> worker
>>>>>>>>> processes ? is it a 1-1, container per worker ? if there's less
>>>>> "work"
>>>>>>>> for
>>>>>>>>> the worker's Java process (for example) now and it becomes a
>>>> sort of
>>>>>>>>> "dispatcher", would that change the resource allocation commonly
>>>>> used
>>>>>>> for
>>>>>>>>> the same Pipeline so that the worker processes would require less
>>>>>>>>> resources, while giving those to the container ?
>>>>>>>>>
>>>>>>>> I think with the four services (control, data, state, logging) you
>>>> can
>>>>>>> go
>>>>>>>> with a 1-1 relationship or break it up more finely grained and
>>>>> dedicate
>>>>>>>> some machines to have specific tasks. Like you could have a few
>>>>> machines
>>>>>>>> dedicated to log aggregation which all the workers push their logs
>>>> to.
>>>>>>>> Similarly, you could have some machines that have a lot of memory
>>>>> which
>>>>>>>> would be better to be able to do shuffles in memory and then this
>>>>>>> cluster
>>>>>>>> of high memory machines could front the data service. I believe
>>>> there
>>>>>>> is a
>>>>>>>> lot of flexibility based upon what a runner can do and what it
>>>>>>> specializes
>>>>>>>> in and believe that with more effort comes more possibilities
>>>> albeit
>>>>>>> with
>>>>>>>> increased internal complexity.
>>>>>>>>
>>>>>>>> The layout of resources depends on whether the services and SDK
>>>>>>> containers
>>>>>>>> are co-hosted on the same machine or whether there is a different
>>>>>>>> architecture in play. In a co-hosted configuration, it seems likely
>>>>> that
>>>>>>>> the SDK container will get more resources but is dependent on the
>>>>> runner
>>>>>>>> and pipeline shape (shuffle heavy dominated pipelines will look
>>>>>>> different
>>>>>>>> then ParDo dominated pipelines).
>>>>>>>>
>>>>>>>>
>>>>>>>>> About executing sub-graphs, would it be true to say that as long
>>>> as
>>>>>>>> there's
>>>>>>>>> no shuffle, you could keep executing in the same container ?
>>>> meaning
>>>>>>> that
>>>>>>>>> the graph is broken into sub-graphs by shuffles ?
>>>>>>>>>
>>>>>>>> The only thing that is required is that the Apache Beam model is
>>>>>>> preserved
>>>>>>>> so typical break points will be at shuffles and language crossing
>>>>> points
>>>>>>>> (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
>>>>>>> graph
>>>>>>>> even more for other reasons.
>>>>>>>>
>>>>>>>>
>>>>>>>>> I have to dig-in deeper, so I could have more questions ;-)
>>>> thanks
>>>>>>> Luke!
>>>>>>>>> On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
>>>>> <lcwik@google.com.invalid
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I updated the PR description to contain the same.
>>>>>>>>>>
>>>>>>>>>> I would start by looking at the API/object model definitions
>>>> found
>>>>>>> in
>>>>>>>>>> beam_fn_api.proto
>>>>>>>>>> <
>>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>>> sdks/common/fn-api/src/main/proto/beam_fn_api.proto
>>>>>>>>>> Then depending on your interest, look at the following:
>>>>>>>>>> * FnHarness.java
>>>>>>>>>> <
>>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/
>>>>>>>> harness/FnHarness.java
>>>>>>>>>> is the main entry point.
>>>>>>>>>> * org.apache.beam.fn.harness.data
>>>>>>>>>> <
>>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
>>>>>>>>>> contains the most interesting bits of code since it is able to
>>>>>>>> multiplex
>>>>>>>>> a
>>>>>>>>>> gRPC stream into multiple logical streams of elements bound for
>>>>>>>> multiple
>>>>>>>>>> concurrent process bundle requests. It also contains the code
>>>> to
>>>>>>> take
>>>>>>>>>> multiple logical outbound streams and multiplex them back onto
>>>> a
>>>>>>> gRPC
>>>>>>>>>> stream.
>>>>>>>>>> * org.apache.beam.runners.core
>>>>>>>>>> <
>>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/runners/core
>>>>>>>>>> contains additional runners akin to the DoFnRunner found in
>>>>>>>> runners-core
>>>>>>>>> to
>>>>>>>>>> support sources and gRPC endpoints.
>>>>>>>>>>
>>>>>>>>>> Unless your really interested in how domain sockets, epoll, nio
>>>>>>> channel
>>>>>>>>>> factories or how stream readiness callbacks work in gRPC, I
>>>> would
>>>>>>> avoid
>>>>>>>>> the
>>>>>>>>>> packages org.apache.beam.fn.harness.channel and
>>>>>>>>>> org.apache.beam.fn.harness.stream. Similarly I would avoid
>>>>>>>>>> org.apache.beam.fn.harness.fn and
>>>> org.apache.beam.fn.harness.fake
>>>>>>> as
>>>>>>>>> they
>>>>>>>>>> don't add anything meaningful to the api.
>>>>>>>>>>
>>>>>>>>>> Code package descriptions:
>>>>>>>>>>
>>>>>>>>>> org.apache.beam.fn.harness.FnHarness: main entry point
>>>>>>>>>> org.apache.beam.fn.harness.control: Control service client and
>>>>>>>>> individual
>>>>>>>>>> request handlers
>>>>>>>>>> org.apache.beam.fn.harness.data: Data service client and
>>>> logical
>>>>>>>> stream
>>>>>>>>>> multiplexing
>>>>>>>>>> org.apache.beam.runners.core: Additional runners akin to the
>>>>>>> DoFnRunner
>>>>>>>>>> found in runners-core to support sources and gRPC endpoints
>>>>>>>>>> org.apache.beam.fn.harness.logging: Logging client
>>>> implementation
>>>>>>> and
>>>>>>>>> JUL
>>>>>>>>>> logging handler adapter
>>>>>>>>>> org.apache.beam.fn.harness.channel: gRPC channel management
>>>>>>>>>> org.apache.beam.fn.harness.stream: gRPC stream management
>>>>>>>>>> org.apache.beam.fn.harness.fn: Java 8 functional interface
>>>>>>> extensions
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
>>>>>>>> <klk@google.com.invalid
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> This is awesome! Any chance you could roadmap the PR for us
>>>> with
>>>>>>> some
>>>>>>>>>> links
>>>>>>>>>>> into the most interesting bits?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
>>>>>>>>>>> robertwb@google.com.invalid> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Also, note that we can still support the "simple" case. For
>>>>>>>> example,
>>>>>>>>>>>> if the user supplies us with a jar file (as they do now) a
>>>>>>> runner
>>>>>>>>>>>> could launch it as a subprocesses and communicate with it
>>>> via
>>>>>>> this
>>>>>>>>>>>> same Fn API or install it in a fixed container itself--the
>>>>> user
>>>>>>>>>>>> doesn't *need* to know about docker or manually manage
>>>>>>> containers
>>>>>>>>> (and
>>>>>>>>>>>> indeed the Fn API could be used in-process, cross-process,
>>>>>>>>>>>> cross-container, and even cross-machine).
>>>>>>>>>>>>
>>>>>>>>>>>> However docker provides a nice cross-language way of
>>>>> specifying
>>>>>>> the
>>>>>>>>>>>> environment including all dependencies (especially for
>>>>> languages
>>>>>>>> like
>>>>>>>>>>>> Python or C where the equivalent of a cross-platform,
>>>>>>>> self-contained
>>>>>>>>>>>> jar isn't as easy to produce) and is strictly more powerful
>>>>> and
>>>>>>>>>>>> flexible (specifically it isolates the runtime environment
>>>> and
>>>>>>> one
>>>>>>>>> can
>>>>>>>>>>>> even use it for local testing).
>>>>>>>>>>>>
>>>>>>>>>>>> Slicing a worker up like this without sacrificing
>>>> performance
>>>>>>> is an
>>>>>>>>>>>> ambitious goal, but essential to the story of being able to
>>>>> mix
>>>>>>> and
>>>>>>>>>>>> match runners and SDKs arbitrarily, and I think this is a
>>>>> great
>>>>>>>>> start.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
>>>>>>>>> <lcwik@google.com.invalid
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Your correct, a docker container is created that contains
>>>>> the
>>>>>>>>>> execution
>>>>>>>>>>>>> environment the user wants or the user re-uses an
>>>> existing
>>>>> one
>>>>>>>>>>> (allowing
>>>>>>>>>>>>> for a user to embed all their code/dependencies or use a
>>>>>>>> container
>>>>>>>>>> that
>>>>>>>>>>>> can
>>>>>>>>>>>>> deploy code/dependencies on demand).
>>>>>>>>>>>>> A user creates a pipeline saying which docker container
>>>> they
>>>>>>> want
>>>>>>>>> to
>>>>>>>>>>> use
>>>>>>>>>>>>> (this starts to allow for multiple container definitions
>>>>>>> within a
>>>>>>>>>>> single
>>>>>>>>>>>>> pipeline to support multiple languages, versioning, ...).
>>>>>>>>>>>>> A runner would then be responsible for launching one or
>>>> more
>>>>>>> of
>>>>>>>>> these
>>>>>>>>>>>>> containers in a cluster manager of their choice (scaling
>>>> up
>>>>> or
>>>>>>>> down
>>>>>>>>>> the
>>>>>>>>>>>>> number of instances depending on demand/load/...).
>>>>>>>>>>>>> A runner then interacts with the docker containers over
>>>> the
>>>>>>> gRPC
>>>>>>>>>>> service
>>>>>>>>>>>>> definitions to delegate processing to.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
>>>>>>>>>> jb@nanthrax.net
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Luke,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> that's really great and very promising !
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It's really ambitious but I like the idea. Just to
>>>> clarify:
>>>>>>> the
>>>>>>>>>>> purpose
>>>>>>>>>>>> of
>>>>>>>>>>>>>> using gRPC is once the docker container is running,
>>>> then we
>>>>>>> can
>>>>>>>>>>>> "interact"
>>>>>>>>>>>>>> with the container to spread and delegate processing to
>>>> the
>>>>>>>> docker
>>>>>>>>>>>>>> container, correct ?
>>>>>>>>>>>>>> The users/devops have to setup the docker containers as
>>>>>>>>>> prerequisite.
>>>>>>>>>>>>>> Then, the "location" of the containers (kind of
>>>> container
>>>>>>>>> registry)
>>>>>>>>>> is
>>>>>>>>>>>> set
>>>>>>>>>>>>>> via the pipeline options and used by gRPC ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks Luke !
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have been prototyping several components towards the
>>>>> Beam
>>>>>>>>>> technical
>>>>>>>>>>>>>>> vision of being able to execute an arbitrary language
>>>>> using
>>>>>>> an
>>>>>>>>>>>> arbitrary
>>>>>>>>>>>>>>> runner.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I would like to share this overview [1] of what I have
>>>>> been
>>>>>>>>> working
>>>>>>>>>>>>>>> towards. I also share this PR [2] with a proposed API,
>>>>>>> service
>>>>>>>>>>>> definitions
>>>>>>>>>>>>>>> and partial implementation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1: https://s.apache.org/beam-fn-api
>>>>>>>>>>>>>>> 2: https://github.com/apache/beam/pull/1801
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Please comment on the overview within this thread, and
>>>> any
>>>>>>>>> specific
>>>>>>>>>>>> code
>>>>>>>>>>>>>>> comments on the PR directly.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Luke
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>>>> jbonofre@apache.org
>>>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>>>>>
>>>>>>
>>>


Re: Beam Fn API

Posted by Aljoscha Krettek <al...@apache.org>.
Thanks for banging these out Lukasz. I’ll try and read them all this week.

We’re also planning to add support for the Fn API to the Flink Runner so that we can execute Python programs. I’m sure we’ll get some valuable feedback for you while doing that.

> On 26. May 2017, at 22:49, Lukasz Cwik <lc...@google.com.INVALID> wrote:
> 
> I would like to share another document about the Fn API. This document
> specifically discusses how to access side inputs, access remote references
> (e.g. large iterables for hot keys produced by a GBK), and support user
> state.
> https://s.apache.org/beam-fn-state-api-and-bundle-processing
> 
> The document does require a strong foundation in the Apache Beam model and
> a good understanding of the prior shared docs:
> * How to process a bundle: https://s.apache.org/beam-fn-api
> -processing-a-bundle
> * How to send and receive data: https://s.apache.org/beam-fn-api
> -send-and-receive-data
> 
> I could really use the help of runner contributors to review the caching
> semantics within the SDK harness and whether they would work well for the
> runner they contribute to the most.
> 
> On Sun, May 21, 2017 at 6:40 PM, Lukasz Cwik <lc...@google.com> wrote:
> 
>> Manu, the goal is to share here initially, update the docs addressing
>> people's comments, and then publish them on the website once they are
>> stable enough.
>> 
>> On Sun, May 21, 2017 at 5:54 PM, Manu Zhang <ow...@gmail.com>
>> wrote:
>> 
>>> Thanks Lukasz. The following two links were somehow incorrectly formatted
>>> in your mail.
>>> 
>>> * How to process a bundle:
>>> https://s.apache.org/beam-fn-api-processing-a-bundle
>>> * How to send and receive data:
>>> https://s.apache.org/beam-fn-api-send-and-receive-data
>>> 
>>> By the way, is there a way to find them from the Beam website ?
>>> 
>>> 
>>> On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
>>> wrote:
>>> 
>>>> Now that I'm back from vacation and the 2.0.0 release is not taking all
>>> my
>>>> time, I am focusing my attention on working on the Beam Portability
>>>> framework, specifically the Fn API so that we can get Python and other
>>>> language integrations work with any runner.
>>>> 
>>>> For new comers, I would like to reshare the overview:
>>>> https://s.apache.org/beam-fn-api
>>>> 
>>>> And for those of you who have been following this thread and
>>> contributors
>>>> focusing on Runner integration with Apache Beam:
>>>> * How to process a bundle: https://s.apache.org/beam-fn-a
>>> pi-processing-a-
>>>> bundle
>>>> * How to send and receive data: https://s.apache.org/
>>>> beam-fn-api-send-and-receive-data
>>>> 
>>>> If you want to dive deeper, you should look at:
>>>> * Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>>>> common/runner-api/src/main/proto/beam_runner_api.proto
>>>> <https://github.com/apache/beam/blob/master/sdks/common/runn
>>> er-api/src/main/proto/beam_runner_api.proto>
>>>> * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>>>> common/fn-api/src/main/proto/beam_fn_api.proto
>>>> <https://github.com/apache/beam/blob/master/sdks/common/fn-
>>> api/src/main/proto/beam_fn_api.proto>
>>>> * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>>>> java/harness
>>>> <https://github.com/apache/beam/tree/master/sdks/java/harness>
>>>> * Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>>>> python/apache_beam/runners/worker
>>>> <https://github.com/apache/beam/tree/master/sdks/python/apac
>>> he_beam/runners/worker>
>>>> 
>>>> Next I'm planning on talking about Beam Fn State API and will need help
>>>> from Runner contributors to talk about caching semantics and key spaces
>>> and
>>>> whether the integrations mesh well with current Runner implementations.
>>> The
>>>> State API is meant to support user state, side inputs, and re-iteration
>>> for
>>>> large values produced by GroupByKey.
>>>> 
>>>> On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>> 
>>>>> Yes, I was using a Pipeline that was:
>>>>> Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a
>>> batch
>>>>> pipeline in the global window using the default trigger)
>>>>> 
>>>>> In Google Cloud Dataflow, the shuffle step uses the binary
>>> representation
>>>>> to compare keys, so the above pipeline would normally be converted to
>>> the
>>>>> following two stages:
>>>>> Read -> GBK Writer
>>>>> GBK Reader -> IdentityParDo
>>>>> 
>>>>> Note that the GBK Writer and GBK Reader need to use a coder to encode
>>> and
>>>>> decode the value.
>>>>> 
>>>>> When using the Fn API, those two stages expanded because of the Fn Api
>>>>> crossings using a gRPC Write/Read pair:
>>>>> Read -> gRPC Write -> gRPC Read -> GBK Writer
>>>>> GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
>>>>> 
>>>>> In my naive prototype implementation, the coder was used to encode
>>>>> elements at the gRPC steps. This meant that the coder was
>>>>> encoding/decoding/encoding in the first stage and
>>>>> decoding/encoding/decoding in the second stage. This tripled the
>>> amount
>>>> of
>>>>> times the coder was being invoked per element. This additional use of
>>> the
>>>>> coder accounted for ~12% (80% of the 15%) of the extra execution time.
>>>> This
>>>>> implementation is quite inefficient and would benefit from merging the
>>>> gRPC
>>>>> Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
>>>> into
>>>>> another actor allowing for the creation of a fast path that can skip
>>>> parts
>>>>> of the decode/encode cycle through the coder. By using a byte array
>>> view
>>>>> over the logical stream, one can minimize the number of byte array
>>> copies
>>>>> which plagued my naive implementation. This can be done by only
>>> parsing
>>>> the
>>>>> element boundaries out of the stream to produce those logical byte
>>> array
>>>>> views. I have a very rough estimate that performing this optimization
>>>> would
>>>>> reduce the 12% overhead to somewhere between 4% and 6%.
>>>>> 
>>>>> The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
>>>>> marshalling/unmarshalling protos
>>>>> handling/managing the socket
>>>>> flow control
>>>>> ...
>>>>> 
>>>>> Finally, I did try experiments with different buffer sizes (10KB,
>>> 100KB,
>>>>> 1000KB), flow control (separate thread[1] vs same thread with
>>> phaser[2]),
>>>>> and channel type [3] (NIO, epoll, domain socket), but coder overhead
>>>> easily
>>>>> dominated the differences in these other experiments.
>>>>> 
>>>>> Further analysis would need to be done to more accurately distill this
>>>>> down.
>>>>> 
>>>>> 1: https://github.com/lukecwik/incubator-beam/blob/
>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>> rness/stream/
>>>>> BufferingStreamObserver.java
>>>>> 2: https://github.com/lukecwik/incubator-beam/blob/
>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>> rness/stream/
>>>>> DirectStreamObserver.java
>>>>> 3: https://github.com/lukecwik/incubator-beam/blob/
>>>>> 
>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>> rness/channel/
>>>>> ManagedChannelFactory.java
>>>>> 
>>>>> 
>>>>> On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com>
>>> wrote:
>>>>> 
>>>>>> Awesome job Lukasz, Excellent, I have to confess the first time I
>>> heard
>>>>>> about
>>>>>> the Fn API idea I was a bit incredulous, but you are making it real,
>>>>>> amazing!
>>>>>> 
>>>>>> Just one question from your document, you said that 80% of the extra
>>>> (15%)
>>>>>> time
>>>>>> goes into encoding and decoding the data for your test case, can you
>>>>>> expand
>>>>>> in
>>>>>> your current ideas to improve this? (I am not sure I completely
>>>> understand
>>>>>> the
>>>>>> issue).
>>>>>> 
>>>>>> 
>>>>>> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik
>>> <lc...@google.com.invalid>
>>>>>> wrote:
>>>>>> 
>>>>>>> Responded inline.
>>>>>>> 
>>>>>>> On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> This is truly amazing Luke!
>>>>>>>> 
>>>>>>>> If I understand this right, the runner executing the DoFn will
>>>>>> delegate
>>>>>>> the
>>>>>>>> function code and input data (and state, coders, etc.) to the
>>>>>> container
>>>>>>>> where it will execute with the user's SDK of choice, right ?
>>>>>>> 
>>>>>>> 
>>>>>>> Yes, that is correct.
>>>>>>> 
>>>>>>> 
>>>>>>>> I wonder how the containers relate to the underlying engine's
>>> worker
>>>>>>>> processes ? is it a 1-1, container per worker ? if there's less
>>>> "work"
>>>>>>> for
>>>>>>>> the worker's Java process (for example) now and it becomes a
>>> sort of
>>>>>>>> "dispatcher", would that change the resource allocation commonly
>>>> used
>>>>>> for
>>>>>>>> the same Pipeline so that the worker processes would require less
>>>>>>>> resources, while giving those to the container ?
>>>>>>>> 
>>>>>>> 
>>>>>>> I think with the four services (control, data, state, logging) you
>>> can
>>>>>> go
>>>>>>> with a 1-1 relationship or break it up more finely grained and
>>>> dedicate
>>>>>>> some machines to have specific tasks. Like you could have a few
>>>> machines
>>>>>>> dedicated to log aggregation which all the workers push their logs
>>> to.
>>>>>>> Similarly, you could have some machines that have a lot of memory
>>>> which
>>>>>>> would be better to be able to do shuffles in memory and then this
>>>>>> cluster
>>>>>>> of high memory machines could front the data service. I believe
>>> there
>>>>>> is a
>>>>>>> lot of flexibility based upon what a runner can do and what it
>>>>>> specializes
>>>>>>> in and believe that with more effort comes more possibilities
>>> albeit
>>>>>> with
>>>>>>> increased internal complexity.
>>>>>>> 
>>>>>>> The layout of resources depends on whether the services and SDK
>>>>>> containers
>>>>>>> are co-hosted on the same machine or whether there is a different
>>>>>>> architecture in play. In a co-hosted configuration, it seems likely
>>>> that
>>>>>>> the SDK container will get more resources but is dependent on the
>>>> runner
>>>>>>> and pipeline shape (shuffle heavy dominated pipelines will look
>>>>>> different
>>>>>>> then ParDo dominated pipelines).
>>>>>>> 
>>>>>>> 
>>>>>>>> About executing sub-graphs, would it be true to say that as long
>>> as
>>>>>>> there's
>>>>>>>> no shuffle, you could keep executing in the same container ?
>>> meaning
>>>>>> that
>>>>>>>> the graph is broken into sub-graphs by shuffles ?
>>>>>>>> 
>>>>>>> 
>>>>>>> The only thing that is required is that the Apache Beam model is
>>>>>> preserved
>>>>>>> so typical break points will be at shuffles and language crossing
>>>> points
>>>>>>> (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
>>>>>> graph
>>>>>>> even more for other reasons.
>>>>>>> 
>>>>>>> 
>>>>>>>> I have to dig-in deeper, so I could have more questions ;-)
>>> thanks
>>>>>> Luke!
>>>>>>>> 
>>>>>>>> On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
>>>> <lcwik@google.com.invalid
>>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> I updated the PR description to contain the same.
>>>>>>>>> 
>>>>>>>>> I would start by looking at the API/object model definitions
>>> found
>>>>>> in
>>>>>>>>> beam_fn_api.proto
>>>>>>>>> <
>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>> sdks/common/fn-api/src/main/proto/beam_fn_api.proto
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Then depending on your interest, look at the following:
>>>>>>>>> * FnHarness.java
>>>>>>>>> <
>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/
>>>>>>> harness/FnHarness.java
>>>>>>>>>> 
>>>>>>>>> is the main entry point.
>>>>>>>>> * org.apache.beam.fn.harness.data
>>>>>>>>> <
>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
>>>>>>>>>> 
>>>>>>>>> contains the most interesting bits of code since it is able to
>>>>>>> multiplex
>>>>>>>> a
>>>>>>>>> gRPC stream into multiple logical streams of elements bound for
>>>>>>> multiple
>>>>>>>>> concurrent process bundle requests. It also contains the code
>>> to
>>>>>> take
>>>>>>>>> multiple logical outbound streams and multiplex them back onto
>>> a
>>>>>> gRPC
>>>>>>>>> stream.
>>>>>>>>> * org.apache.beam.runners.core
>>>>>>>>> <
>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/runners/core
>>>>>>>>>> 
>>>>>>>>> contains additional runners akin to the DoFnRunner found in
>>>>>>> runners-core
>>>>>>>> to
>>>>>>>>> support sources and gRPC endpoints.
>>>>>>>>> 
>>>>>>>>> Unless your really interested in how domain sockets, epoll, nio
>>>>>> channel
>>>>>>>>> factories or how stream readiness callbacks work in gRPC, I
>>> would
>>>>>> avoid
>>>>>>>> the
>>>>>>>>> packages org.apache.beam.fn.harness.channel and
>>>>>>>>> org.apache.beam.fn.harness.stream. Similarly I would avoid
>>>>>>>>> org.apache.beam.fn.harness.fn and
>>> org.apache.beam.fn.harness.fake
>>>>>> as
>>>>>>>> they
>>>>>>>>> don't add anything meaningful to the api.
>>>>>>>>> 
>>>>>>>>> Code package descriptions:
>>>>>>>>> 
>>>>>>>>> org.apache.beam.fn.harness.FnHarness: main entry point
>>>>>>>>> org.apache.beam.fn.harness.control: Control service client and
>>>>>>>> individual
>>>>>>>>> request handlers
>>>>>>>>> org.apache.beam.fn.harness.data: Data service client and
>>> logical
>>>>>>> stream
>>>>>>>>> multiplexing
>>>>>>>>> org.apache.beam.runners.core: Additional runners akin to the
>>>>>> DoFnRunner
>>>>>>>>> found in runners-core to support sources and gRPC endpoints
>>>>>>>>> org.apache.beam.fn.harness.logging: Logging client
>>> implementation
>>>>>> and
>>>>>>>> JUL
>>>>>>>>> logging handler adapter
>>>>>>>>> org.apache.beam.fn.harness.channel: gRPC channel management
>>>>>>>>> org.apache.beam.fn.harness.stream: gRPC stream management
>>>>>>>>> org.apache.beam.fn.harness.fn: Java 8 functional interface
>>>>>> extensions
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
>>>>>>> <klk@google.com.invalid
>>>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> This is awesome! Any chance you could roadmap the PR for us
>>> with
>>>>>> some
>>>>>>>>> links
>>>>>>>>>> into the most interesting bits?
>>>>>>>>>> 
>>>>>>>>>> On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
>>>>>>>>>> robertwb@google.com.invalid> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Also, note that we can still support the "simple" case. For
>>>>>>> example,
>>>>>>>>>>> if the user supplies us with a jar file (as they do now) a
>>>>>> runner
>>>>>>>>>>> could launch it as a subprocesses and communicate with it
>>> via
>>>>>> this
>>>>>>>>>>> same Fn API or install it in a fixed container itself--the
>>>> user
>>>>>>>>>>> doesn't *need* to know about docker or manually manage
>>>>>> containers
>>>>>>>> (and
>>>>>>>>>>> indeed the Fn API could be used in-process, cross-process,
>>>>>>>>>>> cross-container, and even cross-machine).
>>>>>>>>>>> 
>>>>>>>>>>> However docker provides a nice cross-language way of
>>>> specifying
>>>>>> the
>>>>>>>>>>> environment including all dependencies (especially for
>>>> languages
>>>>>>> like
>>>>>>>>>>> Python or C where the equivalent of a cross-platform,
>>>>>>> self-contained
>>>>>>>>>>> jar isn't as easy to produce) and is strictly more powerful
>>>> and
>>>>>>>>>>> flexible (specifically it isolates the runtime environment
>>> and
>>>>>> one
>>>>>>>> can
>>>>>>>>>>> even use it for local testing).
>>>>>>>>>>> 
>>>>>>>>>>> Slicing a worker up like this without sacrificing
>>> performance
>>>>>> is an
>>>>>>>>>>> ambitious goal, but essential to the story of being able to
>>>> mix
>>>>>> and
>>>>>>>>>>> match runners and SDKs arbitrarily, and I think this is a
>>>> great
>>>>>>>> start.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
>>>>>>>> <lcwik@google.com.invalid
>>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>>> Your correct, a docker container is created that contains
>>>> the
>>>>>>>>> execution
>>>>>>>>>>>> environment the user wants or the user re-uses an
>>> existing
>>>> one
>>>>>>>>>> (allowing
>>>>>>>>>>>> for a user to embed all their code/dependencies or use a
>>>>>>> container
>>>>>>>>> that
>>>>>>>>>>> can
>>>>>>>>>>>> deploy code/dependencies on demand).
>>>>>>>>>>>> A user creates a pipeline saying which docker container
>>> they
>>>>>> want
>>>>>>>> to
>>>>>>>>>> use
>>>>>>>>>>>> (this starts to allow for multiple container definitions
>>>>>> within a
>>>>>>>>>> single
>>>>>>>>>>>> pipeline to support multiple languages, versioning, ...).
>>>>>>>>>>>> A runner would then be responsible for launching one or
>>> more
>>>>>> of
>>>>>>>> these
>>>>>>>>>>>> containers in a cluster manager of their choice (scaling
>>> up
>>>> or
>>>>>>> down
>>>>>>>>> the
>>>>>>>>>>>> number of instances depending on demand/load/...).
>>>>>>>>>>>> A runner then interacts with the docker containers over
>>> the
>>>>>> gRPC
>>>>>>>>>> service
>>>>>>>>>>>> definitions to delegate processing to.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
>>>>>>>>> jb@nanthrax.net
>>>>>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Luke,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> that's really great and very promising !
>>>>>>>>>>>>> 
>>>>>>>>>>>>> It's really ambitious but I like the idea. Just to
>>> clarify:
>>>>>> the
>>>>>>>>>> purpose
>>>>>>>>>>> of
>>>>>>>>>>>>> using gRPC is once the docker container is running,
>>> then we
>>>>>> can
>>>>>>>>>>> "interact"
>>>>>>>>>>>>> with the container to spread and delegate processing to
>>> the
>>>>>>> docker
>>>>>>>>>>>>> container, correct ?
>>>>>>>>>>>>> The users/devops have to setup the docker containers as
>>>>>>>>> prerequisite.
>>>>>>>>>>>>> Then, the "location" of the containers (kind of
>>> container
>>>>>>>> registry)
>>>>>>>>> is
>>>>>>>>>>> set
>>>>>>>>>>>>> via the pipeline options and used by gRPC ?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks Luke !
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regards
>>>>>>>>>>>>> JB
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I have been prototyping several components towards the
>>>> Beam
>>>>>>>>> technical
>>>>>>>>>>>>>> vision of being able to execute an arbitrary language
>>>> using
>>>>>> an
>>>>>>>>>>> arbitrary
>>>>>>>>>>>>>> runner.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I would like to share this overview [1] of what I have
>>>> been
>>>>>>>> working
>>>>>>>>>>>>>> towards. I also share this PR [2] with a proposed API,
>>>>>> service
>>>>>>>>>>> definitions
>>>>>>>>>>>>>> and partial implementation.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 1: https://s.apache.org/beam-fn-api
>>>>>>>>>>>>>> 2: https://github.com/apache/beam/pull/1801
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Please comment on the overview within this thread, and
>>> any
>>>>>>>> specific
>>>>>>>>>>> code
>>>>>>>>>>>>>> comments on the PR directly.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Luke
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>>> jbonofre@apache.org
>>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 


Re: Beam Fn API

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
I would like to share another document about the Fn API. This document
specifically discusses how to access side inputs, access remote references
(e.g. large iterables for hot keys produced by a GBK), and support user
state.
https://s.apache.org/beam-fn-state-api-and-bundle-processing

The document does require a strong foundation in the Apache Beam model and
a good understanding of the prior shared docs:
* How to process a bundle: https://s.apache.org/beam-fn-api
-processing-a-bundle
* How to send and receive data: https://s.apache.org/beam-fn-api
-send-and-receive-data

I could really use the help of runner contributors to review the caching
semantics within the SDK harness and whether they would work well for the
runner they contribute to the most.

On Sun, May 21, 2017 at 6:40 PM, Lukasz Cwik <lc...@google.com> wrote:

> Manu, the goal is to share here initially, update the docs addressing
> people's comments, and then publish them on the website once they are
> stable enough.
>
> On Sun, May 21, 2017 at 5:54 PM, Manu Zhang <ow...@gmail.com>
> wrote:
>
>> Thanks Lukasz. The following two links were somehow incorrectly formatted
>> in your mail.
>>
>> * How to process a bundle:
>> https://s.apache.org/beam-fn-api-processing-a-bundle
>> * How to send and receive data:
>> https://s.apache.org/beam-fn-api-send-and-receive-data
>>
>> By the way, is there a way to find them from the Beam website ?
>>
>>
>> On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
>> wrote:
>>
>> > Now that I'm back from vacation and the 2.0.0 release is not taking all
>> my
>> > time, I am focusing my attention on working on the Beam Portability
>> > framework, specifically the Fn API so that we can get Python and other
>> > language integrations work with any runner.
>> >
>> > For new comers, I would like to reshare the overview:
>> > https://s.apache.org/beam-fn-api
>> >
>> > And for those of you who have been following this thread and
>> contributors
>> > focusing on Runner integration with Apache Beam:
>> > * How to process a bundle: https://s.apache.org/beam-fn-a
>> pi-processing-a-
>> > bundle
>> > * How to send and receive data: https://s.apache.org/
>> > beam-fn-api-send-and-receive-data
>> >
>> > If you want to dive deeper, you should look at:
>> > * Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>> > common/runner-api/src/main/proto/beam_runner_api.proto
>> > <https://github.com/apache/beam/blob/master/sdks/common/runn
>> er-api/src/main/proto/beam_runner_api.proto>
>> > * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>> > common/fn-api/src/main/proto/beam_fn_api.proto
>> > <https://github.com/apache/beam/blob/master/sdks/common/fn-
>> api/src/main/proto/beam_fn_api.proto>
>> > * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>> > java/harness
>> > <https://github.com/apache/beam/tree/master/sdks/java/harness>
>> > * Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>> > python/apache_beam/runners/worker
>> > <https://github.com/apache/beam/tree/master/sdks/python/apac
>> he_beam/runners/worker>
>> >
>> > Next I'm planning on talking about Beam Fn State API and will need help
>> > from Runner contributors to talk about caching semantics and key spaces
>> and
>> > whether the integrations mesh well with current Runner implementations.
>> The
>> > State API is meant to support user state, side inputs, and re-iteration
>> for
>> > large values produced by GroupByKey.
>> >
>> > On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:
>> >
>> > > Yes, I was using a Pipeline that was:
>> > > Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a
>> batch
>> > > pipeline in the global window using the default trigger)
>> > >
>> > > In Google Cloud Dataflow, the shuffle step uses the binary
>> representation
>> > > to compare keys, so the above pipeline would normally be converted to
>> the
>> > > following two stages:
>> > > Read -> GBK Writer
>> > > GBK Reader -> IdentityParDo
>> > >
>> > > Note that the GBK Writer and GBK Reader need to use a coder to encode
>> and
>> > > decode the value.
>> > >
>> > > When using the Fn API, those two stages expanded because of the Fn Api
>> > > crossings using a gRPC Write/Read pair:
>> > > Read -> gRPC Write -> gRPC Read -> GBK Writer
>> > > GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
>> > >
>> > > In my naive prototype implementation, the coder was used to encode
>> > > elements at the gRPC steps. This meant that the coder was
>> > > encoding/decoding/encoding in the first stage and
>> > > decoding/encoding/decoding in the second stage. This tripled the
>> amount
>> > of
>> > > times the coder was being invoked per element. This additional use of
>> the
>> > > coder accounted for ~12% (80% of the 15%) of the extra execution time.
>> > This
>> > > implementation is quite inefficient and would benefit from merging the
>> > gRPC
>> > > Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
>> > into
>> > > another actor allowing for the creation of a fast path that can skip
>> > parts
>> > > of the decode/encode cycle through the coder. By using a byte array
>> view
>> > > over the logical stream, one can minimize the number of byte array
>> copies
>> > > which plagued my naive implementation. This can be done by only
>> parsing
>> > the
>> > > element boundaries out of the stream to produce those logical byte
>> array
>> > > views. I have a very rough estimate that performing this optimization
>> > would
>> > > reduce the 12% overhead to somewhere between 4% and 6%.
>> > >
>> > > The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
>> > > marshalling/unmarshalling protos
>> > > handling/managing the socket
>> > > flow control
>> > > ...
>> > >
>> > > Finally, I did try experiments with different buffer sizes (10KB,
>> 100KB,
>> > > 1000KB), flow control (separate thread[1] vs same thread with
>> phaser[2]),
>> > > and channel type [3] (NIO, epoll, domain socket), but coder overhead
>> > easily
>> > > dominated the differences in these other experiments.
>> > >
>> > > Further analysis would need to be done to more accurately distill this
>> > > down.
>> > >
>> > > 1: https://github.com/lukecwik/incubator-beam/blob/
>> > > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>> rness/stream/
>> > > BufferingStreamObserver.java
>> > > 2: https://github.com/lukecwik/incubator-beam/blob/
>> > > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>> rness/stream/
>> > > DirectStreamObserver.java
>> > > 3: https://github.com/lukecwik/incubator-beam/blob/
>> > >
>> > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>> rness/channel/
>> > > ManagedChannelFactory.java
>> > >
>> > >
>> > > On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com>
>> wrote:
>> > >
>> > >> Awesome job Lukasz, Excellent, I have to confess the first time I
>> heard
>> > >> about
>> > >> the Fn API idea I was a bit incredulous, but you are making it real,
>> > >> amazing!
>> > >>
>> > >> Just one question from your document, you said that 80% of the extra
>> > (15%)
>> > >> time
>> > >> goes into encoding and decoding the data for your test case, can you
>> > >> expand
>> > >> in
>> > >> your current ideas to improve this? (I am not sure I completely
>> > understand
>> > >> the
>> > >> issue).
>> > >>
>> > >>
>> > >> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik
>> <lc...@google.com.invalid>
>> > >> wrote:
>> > >>
>> > >> > Responded inline.
>> > >> >
>> > >> > On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
>> > >> wrote:
>> > >> >
>> > >> > > This is truly amazing Luke!
>> > >> > >
>> > >> > > If I understand this right, the runner executing the DoFn will
>> > >> delegate
>> > >> > the
>> > >> > > function code and input data (and state, coders, etc.) to the
>> > >> container
>> > >> > > where it will execute with the user's SDK of choice, right ?
>> > >> >
>> > >> >
>> > >> > Yes, that is correct.
>> > >> >
>> > >> >
>> > >> > > I wonder how the containers relate to the underlying engine's
>> worker
>> > >> > > processes ? is it a 1-1, container per worker ? if there's less
>> > "work"
>> > >> > for
>> > >> > > the worker's Java process (for example) now and it becomes a
>> sort of
>> > >> > > "dispatcher", would that change the resource allocation commonly
>> > used
>> > >> for
>> > >> > > the same Pipeline so that the worker processes would require less
>> > >> > > resources, while giving those to the container ?
>> > >> > >
>> > >> >
>> > >> > I think with the four services (control, data, state, logging) you
>> can
>> > >> go
>> > >> > with a 1-1 relationship or break it up more finely grained and
>> > dedicate
>> > >> > some machines to have specific tasks. Like you could have a few
>> > machines
>> > >> > dedicated to log aggregation which all the workers push their logs
>> to.
>> > >> > Similarly, you could have some machines that have a lot of memory
>> > which
>> > >> > would be better to be able to do shuffles in memory and then this
>> > >> cluster
>> > >> > of high memory machines could front the data service. I believe
>> there
>> > >> is a
>> > >> > lot of flexibility based upon what a runner can do and what it
>> > >> specializes
>> > >> > in and believe that with more effort comes more possibilities
>> albeit
>> > >> with
>> > >> > increased internal complexity.
>> > >> >
>> > >> > The layout of resources depends on whether the services and SDK
>> > >> containers
>> > >> > are co-hosted on the same machine or whether there is a different
>> > >> > architecture in play. In a co-hosted configuration, it seems likely
>> > that
>> > >> > the SDK container will get more resources but is dependent on the
>> > runner
>> > >> > and pipeline shape (shuffle heavy dominated pipelines will look
>> > >> different
>> > >> > then ParDo dominated pipelines).
>> > >> >
>> > >> >
>> > >> > > About executing sub-graphs, would it be true to say that as long
>> as
>> > >> > there's
>> > >> > > no shuffle, you could keep executing in the same container ?
>> meaning
>> > >> that
>> > >> > > the graph is broken into sub-graphs by shuffles ?
>> > >> > >
>> > >> >
>> > >> > The only thing that is required is that the Apache Beam model is
>> > >> preserved
>> > >> > so typical break points will be at shuffles and language crossing
>> > points
>> > >> > (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
>> > >> graph
>> > >> > even more for other reasons.
>> > >> >
>> > >> >
>> > >> > > I have to dig-in deeper, so I could have more questions ;-)
>> thanks
>> > >> Luke!
>> > >> > >
>> > >> > > On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
>> > <lcwik@google.com.invalid
>> > >> >
>> > >> > > wrote:
>> > >> > >
>> > >> > > > I updated the PR description to contain the same.
>> > >> > > >
>> > >> > > > I would start by looking at the API/object model definitions
>> found
>> > >> in
>> > >> > > > beam_fn_api.proto
>> > >> > > > <
>> > >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
>> > >> > > sdks/common/fn-api/src/main/proto/beam_fn_api.proto
>> > >> > > > >
>> > >> > > >
>> > >> > > > Then depending on your interest, look at the following:
>> > >> > > > * FnHarness.java
>> > >> > > > <
>> > >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
>> > >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/
>> > >> > harness/FnHarness.java
>> > >> > > > >
>> > >> > > > is the main entry point.
>> > >> > > > * org.apache.beam.fn.harness.data
>> > >> > > > <
>> > >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
>> > >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
>> > >> > > > >
>> > >> > > > contains the most interesting bits of code since it is able to
>> > >> > multiplex
>> > >> > > a
>> > >> > > > gRPC stream into multiple logical streams of elements bound for
>> > >> > multiple
>> > >> > > > concurrent process bundle requests. It also contains the code
>> to
>> > >> take
>> > >> > > > multiple logical outbound streams and multiplex them back onto
>> a
>> > >> gRPC
>> > >> > > > stream.
>> > >> > > > * org.apache.beam.runners.core
>> > >> > > > <
>> > >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
>> > >> > > sdks/java/harness/src/main/java/org/apache/beam/runners/core
>> > >> > > > >
>> > >> > > > contains additional runners akin to the DoFnRunner found in
>> > >> > runners-core
>> > >> > > to
>> > >> > > > support sources and gRPC endpoints.
>> > >> > > >
>> > >> > > > Unless your really interested in how domain sockets, epoll, nio
>> > >> channel
>> > >> > > > factories or how stream readiness callbacks work in gRPC, I
>> would
>> > >> avoid
>> > >> > > the
>> > >> > > > packages org.apache.beam.fn.harness.channel and
>> > >> > > > org.apache.beam.fn.harness.stream. Similarly I would avoid
>> > >> > > > org.apache.beam.fn.harness.fn and
>> org.apache.beam.fn.harness.fake
>> > >> as
>> > >> > > they
>> > >> > > > don't add anything meaningful to the api.
>> > >> > > >
>> > >> > > > Code package descriptions:
>> > >> > > >
>> > >> > > > org.apache.beam.fn.harness.FnHarness: main entry point
>> > >> > > > org.apache.beam.fn.harness.control: Control service client and
>> > >> > > individual
>> > >> > > > request handlers
>> > >> > > > org.apache.beam.fn.harness.data: Data service client and
>> logical
>> > >> > stream
>> > >> > > > multiplexing
>> > >> > > > org.apache.beam.runners.core: Additional runners akin to the
>> > >> DoFnRunner
>> > >> > > > found in runners-core to support sources and gRPC endpoints
>> > >> > > > org.apache.beam.fn.harness.logging: Logging client
>> implementation
>> > >> and
>> > >> > > JUL
>> > >> > > > logging handler adapter
>> > >> > > > org.apache.beam.fn.harness.channel: gRPC channel management
>> > >> > > > org.apache.beam.fn.harness.stream: gRPC stream management
>> > >> > > > org.apache.beam.fn.harness.fn: Java 8 functional interface
>> > >> extensions
>> > >> > > >
>> > >> > > >
>> > >> > > > On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
>> > >> > <klk@google.com.invalid
>> > >> > > >
>> > >> > > > wrote:
>> > >> > > >
>> > >> > > > > This is awesome! Any chance you could roadmap the PR for us
>> with
>> > >> some
>> > >> > > > links
>> > >> > > > > into the most interesting bits?
>> > >> > > > >
>> > >> > > > > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
>> > >> > > > > robertwb@google.com.invalid> wrote:
>> > >> > > > >
>> > >> > > > > > Also, note that we can still support the "simple" case. For
>> > >> > example,
>> > >> > > > > > if the user supplies us with a jar file (as they do now) a
>> > >> runner
>> > >> > > > > > could launch it as a subprocesses and communicate with it
>> via
>> > >> this
>> > >> > > > > > same Fn API or install it in a fixed container itself--the
>> > user
>> > >> > > > > > doesn't *need* to know about docker or manually manage
>> > >> containers
>> > >> > > (and
>> > >> > > > > > indeed the Fn API could be used in-process, cross-process,
>> > >> > > > > > cross-container, and even cross-machine).
>> > >> > > > > >
>> > >> > > > > > However docker provides a nice cross-language way of
>> > specifying
>> > >> the
>> > >> > > > > > environment including all dependencies (especially for
>> > languages
>> > >> > like
>> > >> > > > > > Python or C where the equivalent of a cross-platform,
>> > >> > self-contained
>> > >> > > > > > jar isn't as easy to produce) and is strictly more powerful
>> > and
>> > >> > > > > > flexible (specifically it isolates the runtime environment
>> and
>> > >> one
>> > >> > > can
>> > >> > > > > > even use it for local testing).
>> > >> > > > > >
>> > >> > > > > > Slicing a worker up like this without sacrificing
>> performance
>> > >> is an
>> > >> > > > > > ambitious goal, but essential to the story of being able to
>> > mix
>> > >> and
>> > >> > > > > > match runners and SDKs arbitrarily, and I think this is a
>> > great
>> > >> > > start.
>> > >> > > > > >
>> > >> > > > > >
>> > >> > > > > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
>> > >> > > <lcwik@google.com.invalid
>> > >> > > > >
>> > >> > > > > > wrote:
>> > >> > > > > > > Your correct, a docker container is created that contains
>> > the
>> > >> > > > execution
>> > >> > > > > > > environment the user wants or the user re-uses an
>> existing
>> > one
>> > >> > > > > (allowing
>> > >> > > > > > > for a user to embed all their code/dependencies or use a
>> > >> > container
>> > >> > > > that
>> > >> > > > > > can
>> > >> > > > > > > deploy code/dependencies on demand).
>> > >> > > > > > > A user creates a pipeline saying which docker container
>> they
>> > >> want
>> > >> > > to
>> > >> > > > > use
>> > >> > > > > > > (this starts to allow for multiple container definitions
>> > >> within a
>> > >> > > > > single
>> > >> > > > > > > pipeline to support multiple languages, versioning, ...).
>> > >> > > > > > > A runner would then be responsible for launching one or
>> more
>> > >> of
>> > >> > > these
>> > >> > > > > > > containers in a cluster manager of their choice (scaling
>> up
>> > or
>> > >> > down
>> > >> > > > the
>> > >> > > > > > > number of instances depending on demand/load/...).
>> > >> > > > > > > A runner then interacts with the docker containers over
>> the
>> > >> gRPC
>> > >> > > > > service
>> > >> > > > > > > definitions to delegate processing to.
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
>> > >> > > > jb@nanthrax.net
>> > >> > > > > >
>> > >> > > > > > > wrote:
>> > >> > > > > > >
>> > >> > > > > > >> Hi Luke,
>> > >> > > > > > >>
>> > >> > > > > > >> that's really great and very promising !
>> > >> > > > > > >>
>> > >> > > > > > >> It's really ambitious but I like the idea. Just to
>> clarify:
>> > >> the
>> > >> > > > > purpose
>> > >> > > > > > of
>> > >> > > > > > >> using gRPC is once the docker container is running,
>> then we
>> > >> can
>> > >> > > > > > "interact"
>> > >> > > > > > >> with the container to spread and delegate processing to
>> the
>> > >> > docker
>> > >> > > > > > >> container, correct ?
>> > >> > > > > > >> The users/devops have to setup the docker containers as
>> > >> > > > prerequisite.
>> > >> > > > > > >> Then, the "location" of the containers (kind of
>> container
>> > >> > > registry)
>> > >> > > > is
>> > >> > > > > > set
>> > >> > > > > > >> via the pipeline options and used by gRPC ?
>> > >> > > > > > >>
>> > >> > > > > > >> Thanks Luke !
>> > >> > > > > > >>
>> > >> > > > > > >> Regards
>> > >> > > > > > >> JB
>> > >> > > > > > >>
>> > >> > > > > > >>
>> > >> > > > > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
>> > >> > > > > > >>
>> > >> > > > > > >>> I have been prototyping several components towards the
>> > Beam
>> > >> > > > technical
>> > >> > > > > > >>> vision of being able to execute an arbitrary language
>> > using
>> > >> an
>> > >> > > > > > arbitrary
>> > >> > > > > > >>> runner.
>> > >> > > > > > >>>
>> > >> > > > > > >>> I would like to share this overview [1] of what I have
>> > been
>> > >> > > working
>> > >> > > > > > >>> towards. I also share this PR [2] with a proposed API,
>> > >> service
>> > >> > > > > > definitions
>> > >> > > > > > >>> and partial implementation.
>> > >> > > > > > >>>
>> > >> > > > > > >>> 1: https://s.apache.org/beam-fn-api
>> > >> > > > > > >>> 2: https://github.com/apache/beam/pull/1801
>> > >> > > > > > >>>
>> > >> > > > > > >>> Please comment on the overview within this thread, and
>> any
>> > >> > > specific
>> > >> > > > > > code
>> > >> > > > > > >>> comments on the PR directly.
>> > >> > > > > > >>>
>> > >> > > > > > >>> Luke
>> > >> > > > > > >>>
>> > >> > > > > > >>>
>> > >> > > > > > >> --
>> > >> > > > > > >> Jean-Baptiste Onofré
>> > >> > > > > > >> jbonofre@apache.org
>> > >> > > > > > >> http://blog.nanthrax.net
>> > >> > > > > > >> Talend - http://www.talend.com
>> > >> > > > > > >>
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Re: Beam Fn API

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
Manu, the goal is to share here initially, update the docs addressing
people's comments, and then publish them on the website once they are
stable enough.

On Sun, May 21, 2017 at 5:54 PM, Manu Zhang <ow...@gmail.com> wrote:

> Thanks Lukasz. The following two links were somehow incorrectly formatted
> in your mail.
>
> * How to process a bundle:
> https://s.apache.org/beam-fn-api-processing-a-bundle
> * How to send and receive data:
> https://s.apache.org/beam-fn-api-send-and-receive-data
>
> By the way, is there a way to find them from the Beam website ?
>
>
> On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> > Now that I'm back from vacation and the 2.0.0 release is not taking all
> my
> > time, I am focusing my attention on working on the Beam Portability
> > framework, specifically the Fn API so that we can get Python and other
> > language integrations work with any runner.
> >
> > For new comers, I would like to reshare the overview:
> > https://s.apache.org/beam-fn-api
> >
> > And for those of you who have been following this thread and contributors
> > focusing on Runner integration with Apache Beam:
> > * How to process a bundle: https://s.apache.org/beam-fn-
> api-processing-a-
> > bundle
> > * How to send and receive data: https://s.apache.org/
> > beam-fn-api-send-and-receive-data
> >
> > If you want to dive deeper, you should look at:
> > * Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
> > common/runner-api/src/main/proto/beam_runner_api.proto
> > <https://github.com/apache/beam/blob/master/sdks/common/
> runner-api/src/main/proto/beam_runner_api.proto>
> > * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
> > common/fn-api/src/main/proto/beam_fn_api.proto
> > <https://github.com/apache/beam/blob/master/sdks/common/
> fn-api/src/main/proto/beam_fn_api.proto>
> > * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
> > java/harness
> > <https://github.com/apache/beam/tree/master/sdks/java/harness>
> > * Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
> > python/apache_beam/runners/worker
> > <https://github.com/apache/beam/tree/master/sdks/python/
> apache_beam/runners/worker>
> >
> > Next I'm planning on talking about Beam Fn State API and will need help
> > from Runner contributors to talk about caching semantics and key spaces
> and
> > whether the integrations mesh well with current Runner implementations.
> The
> > State API is meant to support user state, side inputs, and re-iteration
> for
> > large values produced by GroupByKey.
> >
> > On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:
> >
> > > Yes, I was using a Pipeline that was:
> > > Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a
> batch
> > > pipeline in the global window using the default trigger)
> > >
> > > In Google Cloud Dataflow, the shuffle step uses the binary
> representation
> > > to compare keys, so the above pipeline would normally be converted to
> the
> > > following two stages:
> > > Read -> GBK Writer
> > > GBK Reader -> IdentityParDo
> > >
> > > Note that the GBK Writer and GBK Reader need to use a coder to encode
> and
> > > decode the value.
> > >
> > > When using the Fn API, those two stages expanded because of the Fn Api
> > > crossings using a gRPC Write/Read pair:
> > > Read -> gRPC Write -> gRPC Read -> GBK Writer
> > > GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
> > >
> > > In my naive prototype implementation, the coder was used to encode
> > > elements at the gRPC steps. This meant that the coder was
> > > encoding/decoding/encoding in the first stage and
> > > decoding/encoding/decoding in the second stage. This tripled the amount
> > of
> > > times the coder was being invoked per element. This additional use of
> the
> > > coder accounted for ~12% (80% of the 15%) of the extra execution time.
> > This
> > > implementation is quite inefficient and would benefit from merging the
> > gRPC
> > > Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
> > into
> > > another actor allowing for the creation of a fast path that can skip
> > parts
> > > of the decode/encode cycle through the coder. By using a byte array
> view
> > > over the logical stream, one can minimize the number of byte array
> copies
> > > which plagued my naive implementation. This can be done by only parsing
> > the
> > > element boundaries out of the stream to produce those logical byte
> array
> > > views. I have a very rough estimate that performing this optimization
> > would
> > > reduce the 12% overhead to somewhere between 4% and 6%.
> > >
> > > The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
> > > marshalling/unmarshalling protos
> > > handling/managing the socket
> > > flow control
> > > ...
> > >
> > > Finally, I did try experiments with different buffer sizes (10KB,
> 100KB,
> > > 1000KB), flow control (separate thread[1] vs same thread with
> phaser[2]),
> > > and channel type [3] (NIO, epoll, domain socket), but coder overhead
> > easily
> > > dominated the differences in these other experiments.
> > >
> > > Further analysis would need to be done to more accurately distill this
> > > down.
> > >
> > > 1: https://github.com/lukecwik/incubator-beam/blob/
> > > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/
> harness/stream/
> > > BufferingStreamObserver.java
> > > 2: https://github.com/lukecwik/incubator-beam/blob/
> > > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/
> harness/stream/
> > > DirectStreamObserver.java
> > > 3: https://github.com/lukecwik/incubator-beam/blob/
> > >
> > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/
> harness/channel/
> > > ManagedChannelFactory.java
> > >
> > >
> > > On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com>
> wrote:
> > >
> > >> Awesome job Lukasz, Excellent, I have to confess the first time I
> heard
> > >> about
> > >> the Fn API idea I was a bit incredulous, but you are making it real,
> > >> amazing!
> > >>
> > >> Just one question from your document, you said that 80% of the extra
> > (15%)
> > >> time
> > >> goes into encoding and decoding the data for your test case, can you
> > >> expand
> > >> in
> > >> your current ideas to improve this? (I am not sure I completely
> > understand
> > >> the
> > >> issue).
> > >>
> > >>
> > >> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik <lcwik@google.com.invalid
> >
> > >> wrote:
> > >>
> > >> > Responded inline.
> > >> >
> > >> > On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
> > >> wrote:
> > >> >
> > >> > > This is truly amazing Luke!
> > >> > >
> > >> > > If I understand this right, the runner executing the DoFn will
> > >> delegate
> > >> > the
> > >> > > function code and input data (and state, coders, etc.) to the
> > >> container
> > >> > > where it will execute with the user's SDK of choice, right ?
> > >> >
> > >> >
> > >> > Yes, that is correct.
> > >> >
> > >> >
> > >> > > I wonder how the containers relate to the underlying engine's
> worker
> > >> > > processes ? is it a 1-1, container per worker ? if there's less
> > "work"
> > >> > for
> > >> > > the worker's Java process (for example) now and it becomes a sort
> of
> > >> > > "dispatcher", would that change the resource allocation commonly
> > used
> > >> for
> > >> > > the same Pipeline so that the worker processes would require less
> > >> > > resources, while giving those to the container ?
> > >> > >
> > >> >
> > >> > I think with the four services (control, data, state, logging) you
> can
> > >> go
> > >> > with a 1-1 relationship or break it up more finely grained and
> > dedicate
> > >> > some machines to have specific tasks. Like you could have a few
> > machines
> > >> > dedicated to log aggregation which all the workers push their logs
> to.
> > >> > Similarly, you could have some machines that have a lot of memory
> > which
> > >> > would be better to be able to do shuffles in memory and then this
> > >> cluster
> > >> > of high memory machines could front the data service. I believe
> there
> > >> is a
> > >> > lot of flexibility based upon what a runner can do and what it
> > >> specializes
> > >> > in and believe that with more effort comes more possibilities albeit
> > >> with
> > >> > increased internal complexity.
> > >> >
> > >> > The layout of resources depends on whether the services and SDK
> > >> containers
> > >> > are co-hosted on the same machine or whether there is a different
> > >> > architecture in play. In a co-hosted configuration, it seems likely
> > that
> > >> > the SDK container will get more resources but is dependent on the
> > runner
> > >> > and pipeline shape (shuffle heavy dominated pipelines will look
> > >> different
> > >> > then ParDo dominated pipelines).
> > >> >
> > >> >
> > >> > > About executing sub-graphs, would it be true to say that as long
> as
> > >> > there's
> > >> > > no shuffle, you could keep executing in the same container ?
> meaning
> > >> that
> > >> > > the graph is broken into sub-graphs by shuffles ?
> > >> > >
> > >> >
> > >> > The only thing that is required is that the Apache Beam model is
> > >> preserved
> > >> > so typical break points will be at shuffles and language crossing
> > points
> > >> > (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
> > >> graph
> > >> > even more for other reasons.
> > >> >
> > >> >
> > >> > > I have to dig-in deeper, so I could have more questions ;-) thanks
> > >> Luke!
> > >> > >
> > >> > > On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
> > <lcwik@google.com.invalid
> > >> >
> > >> > > wrote:
> > >> > >
> > >> > > > I updated the PR description to contain the same.
> > >> > > >
> > >> > > > I would start by looking at the API/object model definitions
> found
> > >> in
> > >> > > > beam_fn_api.proto
> > >> > > > <
> > >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> > >> > > sdks/common/fn-api/src/main/proto/beam_fn_api.proto
> > >> > > > >
> > >> > > >
> > >> > > > Then depending on your interest, look at the following:
> > >> > > > * FnHarness.java
> > >> > > > <
> > >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> > >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/
> > >> > harness/FnHarness.java
> > >> > > > >
> > >> > > > is the main entry point.
> > >> > > > * org.apache.beam.fn.harness.data
> > >> > > > <
> > >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> > >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
> > >> > > > >
> > >> > > > contains the most interesting bits of code since it is able to
> > >> > multiplex
> > >> > > a
> > >> > > > gRPC stream into multiple logical streams of elements bound for
> > >> > multiple
> > >> > > > concurrent process bundle requests. It also contains the code to
> > >> take
> > >> > > > multiple logical outbound streams and multiplex them back onto a
> > >> gRPC
> > >> > > > stream.
> > >> > > > * org.apache.beam.runners.core
> > >> > > > <
> > >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> > >> > > sdks/java/harness/src/main/java/org/apache/beam/runners/core
> > >> > > > >
> > >> > > > contains additional runners akin to the DoFnRunner found in
> > >> > runners-core
> > >> > > to
> > >> > > > support sources and gRPC endpoints.
> > >> > > >
> > >> > > > Unless your really interested in how domain sockets, epoll, nio
> > >> channel
> > >> > > > factories or how stream readiness callbacks work in gRPC, I
> would
> > >> avoid
> > >> > > the
> > >> > > > packages org.apache.beam.fn.harness.channel and
> > >> > > > org.apache.beam.fn.harness.stream. Similarly I would avoid
> > >> > > > org.apache.beam.fn.harness.fn and org.apache.beam.fn.harness.
> fake
> > >> as
> > >> > > they
> > >> > > > don't add anything meaningful to the api.
> > >> > > >
> > >> > > > Code package descriptions:
> > >> > > >
> > >> > > > org.apache.beam.fn.harness.FnHarness: main entry point
> > >> > > > org.apache.beam.fn.harness.control: Control service client and
> > >> > > individual
> > >> > > > request handlers
> > >> > > > org.apache.beam.fn.harness.data: Data service client and
> logical
> > >> > stream
> > >> > > > multiplexing
> > >> > > > org.apache.beam.runners.core: Additional runners akin to the
> > >> DoFnRunner
> > >> > > > found in runners-core to support sources and gRPC endpoints
> > >> > > > org.apache.beam.fn.harness.logging: Logging client
> implementation
> > >> and
> > >> > > JUL
> > >> > > > logging handler adapter
> > >> > > > org.apache.beam.fn.harness.channel: gRPC channel management
> > >> > > > org.apache.beam.fn.harness.stream: gRPC stream management
> > >> > > > org.apache.beam.fn.harness.fn: Java 8 functional interface
> > >> extensions
> > >> > > >
> > >> > > >
> > >> > > > On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
> > >> > <klk@google.com.invalid
> > >> > > >
> > >> > > > wrote:
> > >> > > >
> > >> > > > > This is awesome! Any chance you could roadmap the PR for us
> with
> > >> some
> > >> > > > links
> > >> > > > > into the most interesting bits?
> > >> > > > >
> > >> > > > > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
> > >> > > > > robertwb@google.com.invalid> wrote:
> > >> > > > >
> > >> > > > > > Also, note that we can still support the "simple" case. For
> > >> > example,
> > >> > > > > > if the user supplies us with a jar file (as they do now) a
> > >> runner
> > >> > > > > > could launch it as a subprocesses and communicate with it
> via
> > >> this
> > >> > > > > > same Fn API or install it in a fixed container itself--the
> > user
> > >> > > > > > doesn't *need* to know about docker or manually manage
> > >> containers
> > >> > > (and
> > >> > > > > > indeed the Fn API could be used in-process, cross-process,
> > >> > > > > > cross-container, and even cross-machine).
> > >> > > > > >
> > >> > > > > > However docker provides a nice cross-language way of
> > specifying
> > >> the
> > >> > > > > > environment including all dependencies (especially for
> > languages
> > >> > like
> > >> > > > > > Python or C where the equivalent of a cross-platform,
> > >> > self-contained
> > >> > > > > > jar isn't as easy to produce) and is strictly more powerful
> > and
> > >> > > > > > flexible (specifically it isolates the runtime environment
> and
> > >> one
> > >> > > can
> > >> > > > > > even use it for local testing).
> > >> > > > > >
> > >> > > > > > Slicing a worker up like this without sacrificing
> performance
> > >> is an
> > >> > > > > > ambitious goal, but essential to the story of being able to
> > mix
> > >> and
> > >> > > > > > match runners and SDKs arbitrarily, and I think this is a
> > great
> > >> > > start.
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
> > >> > > <lcwik@google.com.invalid
> > >> > > > >
> > >> > > > > > wrote:
> > >> > > > > > > Your correct, a docker container is created that contains
> > the
> > >> > > > execution
> > >> > > > > > > environment the user wants or the user re-uses an existing
> > one
> > >> > > > > (allowing
> > >> > > > > > > for a user to embed all their code/dependencies or use a
> > >> > container
> > >> > > > that
> > >> > > > > > can
> > >> > > > > > > deploy code/dependencies on demand).
> > >> > > > > > > A user creates a pipeline saying which docker container
> they
> > >> want
> > >> > > to
> > >> > > > > use
> > >> > > > > > > (this starts to allow for multiple container definitions
> > >> within a
> > >> > > > > single
> > >> > > > > > > pipeline to support multiple languages, versioning, ...).
> > >> > > > > > > A runner would then be responsible for launching one or
> more
> > >> of
> > >> > > these
> > >> > > > > > > containers in a cluster manager of their choice (scaling
> up
> > or
> > >> > down
> > >> > > > the
> > >> > > > > > > number of instances depending on demand/load/...).
> > >> > > > > > > A runner then interacts with the docker containers over
> the
> > >> gRPC
> > >> > > > > service
> > >> > > > > > > definitions to delegate processing to.
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
> > >> > > > jb@nanthrax.net
> > >> > > > > >
> > >> > > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > >> Hi Luke,
> > >> > > > > > >>
> > >> > > > > > >> that's really great and very promising !
> > >> > > > > > >>
> > >> > > > > > >> It's really ambitious but I like the idea. Just to
> clarify:
> > >> the
> > >> > > > > purpose
> > >> > > > > > of
> > >> > > > > > >> using gRPC is once the docker container is running, then
> we
> > >> can
> > >> > > > > > "interact"
> > >> > > > > > >> with the container to spread and delegate processing to
> the
> > >> > docker
> > >> > > > > > >> container, correct ?
> > >> > > > > > >> The users/devops have to setup the docker containers as
> > >> > > > prerequisite.
> > >> > > > > > >> Then, the "location" of the containers (kind of container
> > >> > > registry)
> > >> > > > is
> > >> > > > > > set
> > >> > > > > > >> via the pipeline options and used by gRPC ?
> > >> > > > > > >>
> > >> > > > > > >> Thanks Luke !
> > >> > > > > > >>
> > >> > > > > > >> Regards
> > >> > > > > > >> JB
> > >> > > > > > >>
> > >> > > > > > >>
> > >> > > > > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
> > >> > > > > > >>
> > >> > > > > > >>> I have been prototyping several components towards the
> > Beam
> > >> > > > technical
> > >> > > > > > >>> vision of being able to execute an arbitrary language
> > using
> > >> an
> > >> > > > > > arbitrary
> > >> > > > > > >>> runner.
> > >> > > > > > >>>
> > >> > > > > > >>> I would like to share this overview [1] of what I have
> > been
> > >> > > working
> > >> > > > > > >>> towards. I also share this PR [2] with a proposed API,
> > >> service
> > >> > > > > > definitions
> > >> > > > > > >>> and partial implementation.
> > >> > > > > > >>>
> > >> > > > > > >>> 1: https://s.apache.org/beam-fn-api
> > >> > > > > > >>> 2: https://github.com/apache/beam/pull/1801
> > >> > > > > > >>>
> > >> > > > > > >>> Please comment on the overview within this thread, and
> any
> > >> > > specific
> > >> > > > > > code
> > >> > > > > > >>> comments on the PR directly.
> > >> > > > > > >>>
> > >> > > > > > >>> Luke
> > >> > > > > > >>>
> > >> > > > > > >>>
> > >> > > > > > >> --
> > >> > > > > > >> Jean-Baptiste Onofré
> > >> > > > > > >> jbonofre@apache.org
> > >> > > > > > >> http://blog.nanthrax.net
> > >> > > > > > >> Talend - http://www.talend.com
> > >> > > > > > >>
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Re: Beam Fn API

Posted by Manu Zhang <ow...@gmail.com>.
Thanks Lukasz. The following two links were somehow incorrectly formatted
in your mail.

* How to process a bundle:
https://s.apache.org/beam-fn-api-processing-a-bundle
* How to send and receive data:
https://s.apache.org/beam-fn-api-send-and-receive-data

By the way, is there a way to find them from the Beam website ?


On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
wrote:

> Now that I'm back from vacation and the 2.0.0 release is not taking all my
> time, I am focusing my attention on working on the Beam Portability
> framework, specifically the Fn API so that we can get Python and other
> language integrations work with any runner.
>
> For new comers, I would like to reshare the overview:
> https://s.apache.org/beam-fn-api
>
> And for those of you who have been following this thread and contributors
> focusing on Runner integration with Apache Beam:
> * How to process a bundle: https://s.apache.org/beam-fn-api-processing-a-
> bundle
> * How to send and receive data: https://s.apache.org/
> beam-fn-api-send-and-receive-data
>
> If you want to dive deeper, you should look at:
> * Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
> common/runner-api/src/main/proto/beam_runner_api.proto
> <https://github.com/apache/beam/blob/master/sdks/common/runner-api/src/main/proto/beam_runner_api.proto>
> * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
> common/fn-api/src/main/proto/beam_fn_api.proto
> <https://github.com/apache/beam/blob/master/sdks/common/fn-api/src/main/proto/beam_fn_api.proto>
> * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
> java/harness
> <https://github.com/apache/beam/tree/master/sdks/java/harness>
> * Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
> python/apache_beam/runners/worker
> <https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/worker>
>
> Next I'm planning on talking about Beam Fn State API and will need help
> from Runner contributors to talk about caching semantics and key spaces and
> whether the integrations mesh well with current Runner implementations. The
> State API is meant to support user state, side inputs, and re-iteration for
> large values produced by GroupByKey.
>
> On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:
>
> > Yes, I was using a Pipeline that was:
> > Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a batch
> > pipeline in the global window using the default trigger)
> >
> > In Google Cloud Dataflow, the shuffle step uses the binary representation
> > to compare keys, so the above pipeline would normally be converted to the
> > following two stages:
> > Read -> GBK Writer
> > GBK Reader -> IdentityParDo
> >
> > Note that the GBK Writer and GBK Reader need to use a coder to encode and
> > decode the value.
> >
> > When using the Fn API, those two stages expanded because of the Fn Api
> > crossings using a gRPC Write/Read pair:
> > Read -> gRPC Write -> gRPC Read -> GBK Writer
> > GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
> >
> > In my naive prototype implementation, the coder was used to encode
> > elements at the gRPC steps. This meant that the coder was
> > encoding/decoding/encoding in the first stage and
> > decoding/encoding/decoding in the second stage. This tripled the amount
> of
> > times the coder was being invoked per element. This additional use of the
> > coder accounted for ~12% (80% of the 15%) of the extra execution time.
> This
> > implementation is quite inefficient and would benefit from merging the
> gRPC
> > Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
> into
> > another actor allowing for the creation of a fast path that can skip
> parts
> > of the decode/encode cycle through the coder. By using a byte array view
> > over the logical stream, one can minimize the number of byte array copies
> > which plagued my naive implementation. This can be done by only parsing
> the
> > element boundaries out of the stream to produce those logical byte array
> > views. I have a very rough estimate that performing this optimization
> would
> > reduce the 12% overhead to somewhere between 4% and 6%.
> >
> > The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
> > marshalling/unmarshalling protos
> > handling/managing the socket
> > flow control
> > ...
> >
> > Finally, I did try experiments with different buffer sizes (10KB, 100KB,
> > 1000KB), flow control (separate thread[1] vs same thread with phaser[2]),
> > and channel type [3] (NIO, epoll, domain socket), but coder overhead
> easily
> > dominated the differences in these other experiments.
> >
> > Further analysis would need to be done to more accurately distill this
> > down.
> >
> > 1: https://github.com/lukecwik/incubator-beam/blob/
> > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/
> > BufferingStreamObserver.java
> > 2: https://github.com/lukecwik/incubator-beam/blob/
> > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/
> > DirectStreamObserver.java
> > 3: https://github.com/lukecwik/incubator-beam/blob/
> >
> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/
> > ManagedChannelFactory.java
> >
> >
> > On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com> wrote:
> >
> >> Awesome job Lukasz, Excellent, I have to confess the first time I heard
> >> about
> >> the Fn API idea I was a bit incredulous, but you are making it real,
> >> amazing!
> >>
> >> Just one question from your document, you said that 80% of the extra
> (15%)
> >> time
> >> goes into encoding and decoding the data for your test case, can you
> >> expand
> >> in
> >> your current ideas to improve this? (I am not sure I completely
> understand
> >> the
> >> issue).
> >>
> >>
> >> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik <lc...@google.com.invalid>
> >> wrote:
> >>
> >> > Responded inline.
> >> >
> >> > On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
> >> wrote:
> >> >
> >> > > This is truly amazing Luke!
> >> > >
> >> > > If I understand this right, the runner executing the DoFn will
> >> delegate
> >> > the
> >> > > function code and input data (and state, coders, etc.) to the
> >> container
> >> > > where it will execute with the user's SDK of choice, right ?
> >> >
> >> >
> >> > Yes, that is correct.
> >> >
> >> >
> >> > > I wonder how the containers relate to the underlying engine's worker
> >> > > processes ? is it a 1-1, container per worker ? if there's less
> "work"
> >> > for
> >> > > the worker's Java process (for example) now and it becomes a sort of
> >> > > "dispatcher", would that change the resource allocation commonly
> used
> >> for
> >> > > the same Pipeline so that the worker processes would require less
> >> > > resources, while giving those to the container ?
> >> > >
> >> >
> >> > I think with the four services (control, data, state, logging) you can
> >> go
> >> > with a 1-1 relationship or break it up more finely grained and
> dedicate
> >> > some machines to have specific tasks. Like you could have a few
> machines
> >> > dedicated to log aggregation which all the workers push their logs to.
> >> > Similarly, you could have some machines that have a lot of memory
> which
> >> > would be better to be able to do shuffles in memory and then this
> >> cluster
> >> > of high memory machines could front the data service. I believe there
> >> is a
> >> > lot of flexibility based upon what a runner can do and what it
> >> specializes
> >> > in and believe that with more effort comes more possibilities albeit
> >> with
> >> > increased internal complexity.
> >> >
> >> > The layout of resources depends on whether the services and SDK
> >> containers
> >> > are co-hosted on the same machine or whether there is a different
> >> > architecture in play. In a co-hosted configuration, it seems likely
> that
> >> > the SDK container will get more resources but is dependent on the
> runner
> >> > and pipeline shape (shuffle heavy dominated pipelines will look
> >> different
> >> > then ParDo dominated pipelines).
> >> >
> >> >
> >> > > About executing sub-graphs, would it be true to say that as long as
> >> > there's
> >> > > no shuffle, you could keep executing in the same container ? meaning
> >> that
> >> > > the graph is broken into sub-graphs by shuffles ?
> >> > >
> >> >
> >> > The only thing that is required is that the Apache Beam model is
> >> preserved
> >> > so typical break points will be at shuffles and language crossing
> points
> >> > (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
> >> graph
> >> > even more for other reasons.
> >> >
> >> >
> >> > > I have to dig-in deeper, so I could have more questions ;-) thanks
> >> Luke!
> >> > >
> >> > > On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
> <lcwik@google.com.invalid
> >> >
> >> > > wrote:
> >> > >
> >> > > > I updated the PR description to contain the same.
> >> > > >
> >> > > > I would start by looking at the API/object model definitions found
> >> in
> >> > > > beam_fn_api.proto
> >> > > > <
> >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> >> > > sdks/common/fn-api/src/main/proto/beam_fn_api.proto
> >> > > > >
> >> > > >
> >> > > > Then depending on your interest, look at the following:
> >> > > > * FnHarness.java
> >> > > > <
> >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/
> >> > harness/FnHarness.java
> >> > > > >
> >> > > > is the main entry point.
> >> > > > * org.apache.beam.fn.harness.data
> >> > > > <
> >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
> >> > > > >
> >> > > > contains the most interesting bits of code since it is able to
> >> > multiplex
> >> > > a
> >> > > > gRPC stream into multiple logical streams of elements bound for
> >> > multiple
> >> > > > concurrent process bundle requests. It also contains the code to
> >> take
> >> > > > multiple logical outbound streams and multiplex them back onto a
> >> gRPC
> >> > > > stream.
> >> > > > * org.apache.beam.runners.core
> >> > > > <
> >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> >> > > sdks/java/harness/src/main/java/org/apache/beam/runners/core
> >> > > > >
> >> > > > contains additional runners akin to the DoFnRunner found in
> >> > runners-core
> >> > > to
> >> > > > support sources and gRPC endpoints.
> >> > > >
> >> > > > Unless your really interested in how domain sockets, epoll, nio
> >> channel
> >> > > > factories or how stream readiness callbacks work in gRPC, I would
> >> avoid
> >> > > the
> >> > > > packages org.apache.beam.fn.harness.channel and
> >> > > > org.apache.beam.fn.harness.stream. Similarly I would avoid
> >> > > > org.apache.beam.fn.harness.fn and org.apache.beam.fn.harness.fake
> >> as
> >> > > they
> >> > > > don't add anything meaningful to the api.
> >> > > >
> >> > > > Code package descriptions:
> >> > > >
> >> > > > org.apache.beam.fn.harness.FnHarness: main entry point
> >> > > > org.apache.beam.fn.harness.control: Control service client and
> >> > > individual
> >> > > > request handlers
> >> > > > org.apache.beam.fn.harness.data: Data service client and logical
> >> > stream
> >> > > > multiplexing
> >> > > > org.apache.beam.runners.core: Additional runners akin to the
> >> DoFnRunner
> >> > > > found in runners-core to support sources and gRPC endpoints
> >> > > > org.apache.beam.fn.harness.logging: Logging client implementation
> >> and
> >> > > JUL
> >> > > > logging handler adapter
> >> > > > org.apache.beam.fn.harness.channel: gRPC channel management
> >> > > > org.apache.beam.fn.harness.stream: gRPC stream management
> >> > > > org.apache.beam.fn.harness.fn: Java 8 functional interface
> >> extensions
> >> > > >
> >> > > >
> >> > > > On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
> >> > <klk@google.com.invalid
> >> > > >
> >> > > > wrote:
> >> > > >
> >> > > > > This is awesome! Any chance you could roadmap the PR for us with
> >> some
> >> > > > links
> >> > > > > into the most interesting bits?
> >> > > > >
> >> > > > > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
> >> > > > > robertwb@google.com.invalid> wrote:
> >> > > > >
> >> > > > > > Also, note that we can still support the "simple" case. For
> >> > example,
> >> > > > > > if the user supplies us with a jar file (as they do now) a
> >> runner
> >> > > > > > could launch it as a subprocesses and communicate with it via
> >> this
> >> > > > > > same Fn API or install it in a fixed container itself--the
> user
> >> > > > > > doesn't *need* to know about docker or manually manage
> >> containers
> >> > > (and
> >> > > > > > indeed the Fn API could be used in-process, cross-process,
> >> > > > > > cross-container, and even cross-machine).
> >> > > > > >
> >> > > > > > However docker provides a nice cross-language way of
> specifying
> >> the
> >> > > > > > environment including all dependencies (especially for
> languages
> >> > like
> >> > > > > > Python or C where the equivalent of a cross-platform,
> >> > self-contained
> >> > > > > > jar isn't as easy to produce) and is strictly more powerful
> and
> >> > > > > > flexible (specifically it isolates the runtime environment and
> >> one
> >> > > can
> >> > > > > > even use it for local testing).
> >> > > > > >
> >> > > > > > Slicing a worker up like this without sacrificing performance
> >> is an
> >> > > > > > ambitious goal, but essential to the story of being able to
> mix
> >> and
> >> > > > > > match runners and SDKs arbitrarily, and I think this is a
> great
> >> > > start.
> >> > > > > >
> >> > > > > >
> >> > > > > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
> >> > > <lcwik@google.com.invalid
> >> > > > >
> >> > > > > > wrote:
> >> > > > > > > Your correct, a docker container is created that contains
> the
> >> > > > execution
> >> > > > > > > environment the user wants or the user re-uses an existing
> one
> >> > > > > (allowing
> >> > > > > > > for a user to embed all their code/dependencies or use a
> >> > container
> >> > > > that
> >> > > > > > can
> >> > > > > > > deploy code/dependencies on demand).
> >> > > > > > > A user creates a pipeline saying which docker container they
> >> want
> >> > > to
> >> > > > > use
> >> > > > > > > (this starts to allow for multiple container definitions
> >> within a
> >> > > > > single
> >> > > > > > > pipeline to support multiple languages, versioning, ...).
> >> > > > > > > A runner would then be responsible for launching one or more
> >> of
> >> > > these
> >> > > > > > > containers in a cluster manager of their choice (scaling up
> or
> >> > down
> >> > > > the
> >> > > > > > > number of instances depending on demand/load/...).
> >> > > > > > > A runner then interacts with the docker containers over the
> >> gRPC
> >> > > > > service
> >> > > > > > > definitions to delegate processing to.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
> >> > > > jb@nanthrax.net
> >> > > > > >
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > >> Hi Luke,
> >> > > > > > >>
> >> > > > > > >> that's really great and very promising !
> >> > > > > > >>
> >> > > > > > >> It's really ambitious but I like the idea. Just to clarify:
> >> the
> >> > > > > purpose
> >> > > > > > of
> >> > > > > > >> using gRPC is once the docker container is running, then we
> >> can
> >> > > > > > "interact"
> >> > > > > > >> with the container to spread and delegate processing to the
> >> > docker
> >> > > > > > >> container, correct ?
> >> > > > > > >> The users/devops have to setup the docker containers as
> >> > > > prerequisite.
> >> > > > > > >> Then, the "location" of the containers (kind of container
> >> > > registry)
> >> > > > is
> >> > > > > > set
> >> > > > > > >> via the pipeline options and used by gRPC ?
> >> > > > > > >>
> >> > > > > > >> Thanks Luke !
> >> > > > > > >>
> >> > > > > > >> Regards
> >> > > > > > >> JB
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
> >> > > > > > >>
> >> > > > > > >>> I have been prototyping several components towards the
> Beam
> >> > > > technical
> >> > > > > > >>> vision of being able to execute an arbitrary language
> using
> >> an
> >> > > > > > arbitrary
> >> > > > > > >>> runner.
> >> > > > > > >>>
> >> > > > > > >>> I would like to share this overview [1] of what I have
> been
> >> > > working
> >> > > > > > >>> towards. I also share this PR [2] with a proposed API,
> >> service
> >> > > > > > definitions
> >> > > > > > >>> and partial implementation.
> >> > > > > > >>>
> >> > > > > > >>> 1: https://s.apache.org/beam-fn-api
> >> > > > > > >>> 2: https://github.com/apache/beam/pull/1801
> >> > > > > > >>>
> >> > > > > > >>> Please comment on the overview within this thread, and any
> >> > > specific
> >> > > > > > code
> >> > > > > > >>> comments on the PR directly.
> >> > > > > > >>>
> >> > > > > > >>> Luke
> >> > > > > > >>>
> >> > > > > > >>>
> >> > > > > > >> --
> >> > > > > > >> Jean-Baptiste Onofré
> >> > > > > > >> jbonofre@apache.org
> >> > > > > > >> http://blog.nanthrax.net
> >> > > > > > >> Talend - http://www.talend.com
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>