You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Lukasz Cwik <lc...@google.com.INVALID> on 2017/01/19 23:56:19 UTC

Beam Fn API

I have been prototyping several components towards the Beam technical
vision of being able to execute an arbitrary language using an arbitrary
runner.

I would like to share this overview [1] of what I have been working
towards. I also share this PR [2] with a proposed API, service definitions
and partial implementation.

1: https://s.apache.org/beam-fn-api
2: https://github.com/apache/beam/pull/1801

Please comment on the overview within this thread, and any specific code
comments on the PR directly.

Luke

Re: Beam Fn API

Posted by Robert Bradshaw <ro...@google.com.INVALID>.
Thank! Looks good. I've added some comments to the doc.

On Wed, May 31, 2017 at 7:00 AM, Etienne Chauchot <ec...@gmail.com>
wrote:

> Thanks for all these docs! They are exactly what was needed for new
> contributors as discussed in this thread
>
> https://lists.apache.org/thread.html/ac93d29424e19d57097373b
> 78f3f5bcbc701e4b51385a52a6e27b7ed@%3Cdev.beam.apache.org%3E
>
> Etienne
>
>
> Le 31/05/2017 à 11:12, Aljoscha Krettek a écrit :
>
>> Thanks for banging these out Lukasz. I’ll try and read them all this week.
>>
>> We’re also planning to add support for the Fn API to the Flink Runner so
>> that we can execute Python programs. I’m sure we’ll get some valuable
>> feedback for you while doing that.
>>
>> On 26. May 2017, at 22:49, Lukasz Cwik <lc...@google.com.INVALID> wrote:
>>>
>>> I would like to share another document about the Fn API. This document
>>> specifically discusses how to access side inputs, access remote
>>> references
>>> (e.g. large iterables for hot keys produced by a GBK), and support user
>>> state.
>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing
>>>
>>> The document does require a strong foundation in the Apache Beam model
>>> and
>>> a good understanding of the prior shared docs:
>>> * How to process a bundle: https://s.apache.org/beam-fn-api
>>> -processing-a-bundle
>>> * How to send and receive data: https://s.apache.org/beam-fn-api
>>> -send-and-receive-data
>>>
>>> I could really use the help of runner contributors to review the caching
>>> semantics within the SDK harness and whether they would work well for the
>>> runner they contribute to the most.
>>>
>>> On Sun, May 21, 2017 at 6:40 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>> Manu, the goal is to share here initially, update the docs addressing
>>>> people's comments, and then publish them on the website once they are
>>>> stable enough.
>>>>
>>>> On Sun, May 21, 2017 at 5:54 PM, Manu Zhang <ow...@gmail.com>
>>>> wrote:
>>>>
>>>> Thanks Lukasz. The following two links were somehow incorrectly
>>>>> formatted
>>>>> in your mail.
>>>>>
>>>>> * How to process a bundle:
>>>>> https://s.apache.org/beam-fn-api-processing-a-bundle
>>>>> * How to send and receive data:
>>>>> https://s.apache.org/beam-fn-api-send-and-receive-data
>>>>>
>>>>> By the way, is there a way to find them from the Beam website ?
>>>>>
>>>>>
>>>>> On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
>>>>> wrote:
>>>>>
>>>>> Now that I'm back from vacation and the 2.0.0 release is not taking all
>>>>>>
>>>>> my
>>>>>
>>>>>> time, I am focusing my attention on working on the Beam Portability
>>>>>> framework, specifically the Fn API so that we can get Python and other
>>>>>> language integrations work with any runner.
>>>>>>
>>>>>> For new comers, I would like to reshare the overview:
>>>>>> https://s.apache.org/beam-fn-api
>>>>>>
>>>>>> And for those of you who have been following this thread and
>>>>>>
>>>>> contributors
>>>>>
>>>>>> focusing on Runner integration with Apache Beam:
>>>>>> * How to process a bundle: https://s.apache.org/beam-fn-a
>>>>>>
>>>>> pi-processing-a-
>>>>>
>>>>>> bundle
>>>>>> * How to send and receive data: https://s.apache.org/
>>>>>> beam-fn-api-send-and-receive-data
>>>>>>
>>>>>> If you want to dive deeper, you should look at:
>>>>>> * Runner API Protobuf: https://github.com/apache/beam
>>>>>> /blob/master/sdks/
>>>>>> common/runner-api/src/main/proto/beam_runner_api.proto
>>>>>> <https://github.com/apache/beam/blob/master/sdks/common/runn
>>>>>>
>>>>> er-api/src/main/proto/beam_runner_api.proto>
>>>>>
>>>>>> * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>>>>>> common/fn-api/src/main/proto/beam_fn_api.proto
>>>>>> <https://github.com/apache/beam/blob/master/sdks/common/fn-
>>>>>>
>>>>> api/src/main/proto/beam_fn_api.proto>
>>>>>
>>>>>> * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>>>>>> java/harness
>>>>>> <https://github.com/apache/beam/tree/master/sdks/java/harness>
>>>>>> * Python SDK Harness: https://github.com/apache/beam
>>>>>> /tree/master/sdks/
>>>>>> python/apache_beam/runners/worker
>>>>>> <https://github.com/apache/beam/tree/master/sdks/python/apac
>>>>>>
>>>>> he_beam/runners/worker>
>>>>>
>>>>>> Next I'm planning on talking about Beam Fn State API and will need
>>>>>> help
>>>>>> from Runner contributors to talk about caching semantics and key
>>>>>> spaces
>>>>>>
>>>>> and
>>>>>
>>>>>> whether the integrations mesh well with current Runner
>>>>>> implementations.
>>>>>>
>>>>> The
>>>>>
>>>>>> State API is meant to support user state, side inputs, and
>>>>>> re-iteration
>>>>>>
>>>>> for
>>>>>
>>>>>> large values produced by GroupByKey.
>>>>>>
>>>>>> On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>> Yes, I was using a Pipeline that was:
>>>>>>> Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a
>>>>>>>
>>>>>> batch
>>>>>
>>>>>> pipeline in the global window using the default trigger)
>>>>>>>
>>>>>>> In Google Cloud Dataflow, the shuffle step uses the binary
>>>>>>>
>>>>>> representation
>>>>>
>>>>>> to compare keys, so the above pipeline would normally be converted to
>>>>>>>
>>>>>> the
>>>>>
>>>>>> following two stages:
>>>>>>> Read -> GBK Writer
>>>>>>> GBK Reader -> IdentityParDo
>>>>>>>
>>>>>>> Note that the GBK Writer and GBK Reader need to use a coder to encode
>>>>>>>
>>>>>> and
>>>>>
>>>>>> decode the value.
>>>>>>>
>>>>>>> When using the Fn API, those two stages expanded because of the Fn
>>>>>>> Api
>>>>>>> crossings using a gRPC Write/Read pair:
>>>>>>> Read -> gRPC Write -> gRPC Read -> GBK Writer
>>>>>>> GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
>>>>>>>
>>>>>>> In my naive prototype implementation, the coder was used to encode
>>>>>>> elements at the gRPC steps. This meant that the coder was
>>>>>>> encoding/decoding/encoding in the first stage and
>>>>>>> decoding/encoding/decoding in the second stage. This tripled the
>>>>>>>
>>>>>> amount
>>>>>
>>>>>> of
>>>>>>
>>>>>>> times the coder was being invoked per element. This additional use of
>>>>>>>
>>>>>> the
>>>>>
>>>>>> coder accounted for ~12% (80% of the 15%) of the extra execution time.
>>>>>>>
>>>>>> This
>>>>>>
>>>>>>> implementation is quite inefficient and would benefit from merging
>>>>>>> the
>>>>>>>
>>>>>> gRPC
>>>>>>
>>>>>>> Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
>>>>>>>
>>>>>> into
>>>>>>
>>>>>>> another actor allowing for the creation of a fast path that can skip
>>>>>>>
>>>>>> parts
>>>>>>
>>>>>>> of the decode/encode cycle through the coder. By using a byte array
>>>>>>>
>>>>>> view
>>>>>
>>>>>> over the logical stream, one can minimize the number of byte array
>>>>>>>
>>>>>> copies
>>>>>
>>>>>> which plagued my naive implementation. This can be done by only
>>>>>>>
>>>>>> parsing
>>>>>
>>>>>> the
>>>>>>
>>>>>>> element boundaries out of the stream to produce those logical byte
>>>>>>>
>>>>>> array
>>>>>
>>>>>> views. I have a very rough estimate that performing this optimization
>>>>>>>
>>>>>> would
>>>>>>
>>>>>>> reduce the 12% overhead to somewhere between 4% and 6%.
>>>>>>>
>>>>>>> The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
>>>>>>> marshalling/unmarshalling protos
>>>>>>> handling/managing the socket
>>>>>>> flow control
>>>>>>> ...
>>>>>>>
>>>>>>> Finally, I did try experiments with different buffer sizes (10KB,
>>>>>>>
>>>>>> 100KB,
>>>>>
>>>>>> 1000KB), flow control (separate thread[1] vs same thread with
>>>>>>>
>>>>>> phaser[2]),
>>>>>
>>>>>> and channel type [3] (NIO, epoll, domain socket), but coder overhead
>>>>>>>
>>>>>> easily
>>>>>>
>>>>>>> dominated the differences in these other experiments.
>>>>>>>
>>>>>>> Further analysis would need to be done to more accurately distill
>>>>>>> this
>>>>>>> down.
>>>>>>>
>>>>>>> 1: https://github.com/lukecwik/incubator-beam/blob/
>>>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>>>>>>
>>>>>> rness/stream/
>>>>>
>>>>>> BufferingStreamObserver.java
>>>>>>> 2: https://github.com/lukecwik/incubator-beam/blob/
>>>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>>>>>>
>>>>>> rness/stream/
>>>>>
>>>>>> DirectStreamObserver.java
>>>>>>> 3: https://github.com/lukecwik/incubator-beam/blob/
>>>>>>>
>>>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>>>>>
>>>>> rness/channel/
>>>>>
>>>>>> ManagedChannelFactory.java
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com>
>>>>>>>
>>>>>> wrote:
>>>>>
>>>>>> Awesome job Lukasz, Excellent, I have to confess the first time I
>>>>>>>>
>>>>>>> heard
>>>>>
>>>>>> about
>>>>>>>> the Fn API idea I was a bit incredulous, but you are making it real,
>>>>>>>> amazing!
>>>>>>>>
>>>>>>>> Just one question from your document, you said that 80% of the extra
>>>>>>>>
>>>>>>> (15%)
>>>>>>
>>>>>>> time
>>>>>>>> goes into encoding and decoding the data for your test case, can you
>>>>>>>> expand
>>>>>>>> in
>>>>>>>> your current ideas to improve this? (I am not sure I completely
>>>>>>>>
>>>>>>> understand
>>>>>>
>>>>>>> the
>>>>>>>> issue).
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik
>>>>>>>>
>>>>>>> <lc...@google.com.invalid>
>>>>>
>>>>>> wrote:
>>>>>>>>
>>>>>>>> Responded inline.
>>>>>>>>>
>>>>>>>>> On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
>>>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> This is truly amazing Luke!
>>>>>>>>>>
>>>>>>>>>> If I understand this right, the runner executing the DoFn will
>>>>>>>>>>
>>>>>>>>> delegate
>>>>>>>>
>>>>>>>>> the
>>>>>>>>>
>>>>>>>>>> function code and input data (and state, coders, etc.) to the
>>>>>>>>>>
>>>>>>>>> container
>>>>>>>>
>>>>>>>>> where it will execute with the user's SDK of choice, right ?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yes, that is correct.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I wonder how the containers relate to the underlying engine's
>>>>>>>>>>
>>>>>>>>> worker
>>>>>
>>>>>> processes ? is it a 1-1, container per worker ? if there's less
>>>>>>>>>>
>>>>>>>>> "work"
>>>>>>
>>>>>>> for
>>>>>>>>>
>>>>>>>>>> the worker's Java process (for example) now and it becomes a
>>>>>>>>>>
>>>>>>>>> sort of
>>>>>
>>>>>> "dispatcher", would that change the resource allocation commonly
>>>>>>>>>>
>>>>>>>>> used
>>>>>>
>>>>>>> for
>>>>>>>>
>>>>>>>>> the same Pipeline so that the worker processes would require less
>>>>>>>>>> resources, while giving those to the container ?
>>>>>>>>>>
>>>>>>>>>> I think with the four services (control, data, state, logging) you
>>>>>>>>>
>>>>>>>> can
>>>>>
>>>>>> go
>>>>>>>>
>>>>>>>>> with a 1-1 relationship or break it up more finely grained and
>>>>>>>>>
>>>>>>>> dedicate
>>>>>>
>>>>>>> some machines to have specific tasks. Like you could have a few
>>>>>>>>>
>>>>>>>> machines
>>>>>>
>>>>>>> dedicated to log aggregation which all the workers push their logs
>>>>>>>>>
>>>>>>>> to.
>>>>>
>>>>>> Similarly, you could have some machines that have a lot of memory
>>>>>>>>>
>>>>>>>> which
>>>>>>
>>>>>>> would be better to be able to do shuffles in memory and then this
>>>>>>>>>
>>>>>>>> cluster
>>>>>>>>
>>>>>>>>> of high memory machines could front the data service. I believe
>>>>>>>>>
>>>>>>>> there
>>>>>
>>>>>> is a
>>>>>>>>
>>>>>>>>> lot of flexibility based upon what a runner can do and what it
>>>>>>>>>
>>>>>>>> specializes
>>>>>>>>
>>>>>>>>> in and believe that with more effort comes more possibilities
>>>>>>>>>
>>>>>>>> albeit
>>>>>
>>>>>> with
>>>>>>>>
>>>>>>>>> increased internal complexity.
>>>>>>>>>
>>>>>>>>> The layout of resources depends on whether the services and SDK
>>>>>>>>>
>>>>>>>> containers
>>>>>>>>
>>>>>>>>> are co-hosted on the same machine or whether there is a different
>>>>>>>>> architecture in play. In a co-hosted configuration, it seems likely
>>>>>>>>>
>>>>>>>> that
>>>>>>
>>>>>>> the SDK container will get more resources but is dependent on the
>>>>>>>>>
>>>>>>>> runner
>>>>>>
>>>>>>> and pipeline shape (shuffle heavy dominated pipelines will look
>>>>>>>>>
>>>>>>>> different
>>>>>>>>
>>>>>>>>> then ParDo dominated pipelines).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> About executing sub-graphs, would it be true to say that as long
>>>>>>>>>>
>>>>>>>>> as
>>>>>
>>>>>> there's
>>>>>>>>>
>>>>>>>>>> no shuffle, you could keep executing in the same container ?
>>>>>>>>>>
>>>>>>>>> meaning
>>>>>
>>>>>> that
>>>>>>>>
>>>>>>>>> the graph is broken into sub-graphs by shuffles ?
>>>>>>>>>>
>>>>>>>>>> The only thing that is required is that the Apache Beam model is
>>>>>>>>>
>>>>>>>> preserved
>>>>>>>>
>>>>>>>>> so typical break points will be at shuffles and language crossing
>>>>>>>>>
>>>>>>>> points
>>>>>>
>>>>>>> (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
>>>>>>>>>
>>>>>>>> graph
>>>>>>>>
>>>>>>>>> even more for other reasons.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I have to dig-in deeper, so I could have more questions ;-)
>>>>>>>>>>
>>>>>>>>> thanks
>>>>>
>>>>>> Luke!
>>>>>>>>
>>>>>>>>> On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
>>>>>>>>>>
>>>>>>>>> <lcwik@google.com.invalid
>>>>>>
>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> I updated the PR description to contain the same.
>>>>>>>>>>>
>>>>>>>>>>> I would start by looking at the API/object model definitions
>>>>>>>>>>>
>>>>>>>>>> found
>>>>>
>>>>>> in
>>>>>>>>
>>>>>>>>> beam_fn_api.proto
>>>>>>>>>>> <
>>>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>>>>>
>>>>>>>>>> sdks/common/fn-api/src/main/proto/beam_fn_api.proto
>>>>>>>>>>
>>>>>>>>>>> Then depending on your interest, look at the following:
>>>>>>>>>>> * FnHarness.java
>>>>>>>>>>> <
>>>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>>>>>
>>>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/
>>>>>>>>>>
>>>>>>>>> harness/FnHarness.java
>>>>>>>>>
>>>>>>>>>> is the main entry point.
>>>>>>>>>>> * org.apache.beam.fn.harness.data
>>>>>>>>>>> <
>>>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>>>>>
>>>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
>>>>>>>>>>
>>>>>>>>>>> contains the most interesting bits of code since it is able to
>>>>>>>>>>>
>>>>>>>>>> multiplex
>>>>>>>>>
>>>>>>>>>> a
>>>>>>>>>>
>>>>>>>>>>> gRPC stream into multiple logical streams of elements bound for
>>>>>>>>>>>
>>>>>>>>>> multiple
>>>>>>>>>
>>>>>>>>>> concurrent process bundle requests. It also contains the code
>>>>>>>>>>>
>>>>>>>>>> to
>>>>>
>>>>>> take
>>>>>>>>
>>>>>>>>> multiple logical outbound streams and multiplex them back onto
>>>>>>>>>>>
>>>>>>>>>> a
>>>>>
>>>>>> gRPC
>>>>>>>>
>>>>>>>>> stream.
>>>>>>>>>>> * org.apache.beam.runners.core
>>>>>>>>>>> <
>>>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>>>>>
>>>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/runners/core
>>>>>>>>>>
>>>>>>>>>>> contains additional runners akin to the DoFnRunner found in
>>>>>>>>>>>
>>>>>>>>>> runners-core
>>>>>>>>>
>>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>>>> support sources and gRPC endpoints.
>>>>>>>>>>>
>>>>>>>>>>> Unless your really interested in how domain sockets, epoll, nio
>>>>>>>>>>>
>>>>>>>>>> channel
>>>>>>>>
>>>>>>>>> factories or how stream readiness callbacks work in gRPC, I
>>>>>>>>>>>
>>>>>>>>>> would
>>>>>
>>>>>> avoid
>>>>>>>>
>>>>>>>>> the
>>>>>>>>>>
>>>>>>>>>>> packages org.apache.beam.fn.harness.channel and
>>>>>>>>>>> org.apache.beam.fn.harness.stream. Similarly I would avoid
>>>>>>>>>>> org.apache.beam.fn.harness.fn and
>>>>>>>>>>>
>>>>>>>>>> org.apache.beam.fn.harness.fake
>>>>>
>>>>>> as
>>>>>>>>
>>>>>>>>> they
>>>>>>>>>>
>>>>>>>>>>> don't add anything meaningful to the api.
>>>>>>>>>>>
>>>>>>>>>>> Code package descriptions:
>>>>>>>>>>>
>>>>>>>>>>> org.apache.beam.fn.harness.FnHarness: main entry point
>>>>>>>>>>> org.apache.beam.fn.harness.control: Control service client and
>>>>>>>>>>>
>>>>>>>>>> individual
>>>>>>>>>>
>>>>>>>>>>> request handlers
>>>>>>>>>>> org.apache.beam.fn.harness.data: Data service client and
>>>>>>>>>>>
>>>>>>>>>> logical
>>>>>
>>>>>> stream
>>>>>>>>>
>>>>>>>>>> multiplexing
>>>>>>>>>>> org.apache.beam.runners.core: Additional runners akin to the
>>>>>>>>>>>
>>>>>>>>>> DoFnRunner
>>>>>>>>
>>>>>>>>> found in runners-core to support sources and gRPC endpoints
>>>>>>>>>>> org.apache.beam.fn.harness.logging: Logging client
>>>>>>>>>>>
>>>>>>>>>> implementation
>>>>>
>>>>>> and
>>>>>>>>
>>>>>>>>> JUL
>>>>>>>>>>
>>>>>>>>>>> logging handler adapter
>>>>>>>>>>> org.apache.beam.fn.harness.channel: gRPC channel management
>>>>>>>>>>> org.apache.beam.fn.harness.stream: gRPC stream management
>>>>>>>>>>> org.apache.beam.fn.harness.fn: Java 8 functional interface
>>>>>>>>>>>
>>>>>>>>>> extensions
>>>>>>>>
>>>>>>>>>
>>>>>>>>>>> On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
>>>>>>>>>>>
>>>>>>>>>> <klk@google.com.invalid
>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> This is awesome! Any chance you could roadmap the PR for us
>>>>>>>>>>>>
>>>>>>>>>>> with
>>>>>
>>>>>> some
>>>>>>>>
>>>>>>>>> links
>>>>>>>>>>>
>>>>>>>>>>>> into the most interesting bits?
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
>>>>>>>>>>>> robertwb@google.com.invalid> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Also, note that we can still support the "simple" case. For
>>>>>>>>>>>>>
>>>>>>>>>>>> example,
>>>>>>>>>
>>>>>>>>>> if the user supplies us with a jar file (as they do now) a
>>>>>>>>>>>>>
>>>>>>>>>>>> runner
>>>>>>>>
>>>>>>>>> could launch it as a subprocesses and communicate with it
>>>>>>>>>>>>>
>>>>>>>>>>>> via
>>>>>
>>>>>> this
>>>>>>>>
>>>>>>>>> same Fn API or install it in a fixed container itself--the
>>>>>>>>>>>>>
>>>>>>>>>>>> user
>>>>>>
>>>>>>> doesn't *need* to know about docker or manually manage
>>>>>>>>>>>>>
>>>>>>>>>>>> containers
>>>>>>>>
>>>>>>>>> (and
>>>>>>>>>>
>>>>>>>>>>> indeed the Fn API could be used in-process, cross-process,
>>>>>>>>>>>>> cross-container, and even cross-machine).
>>>>>>>>>>>>>
>>>>>>>>>>>>> However docker provides a nice cross-language way of
>>>>>>>>>>>>>
>>>>>>>>>>>> specifying
>>>>>>
>>>>>>> the
>>>>>>>>
>>>>>>>>> environment including all dependencies (especially for
>>>>>>>>>>>>>
>>>>>>>>>>>> languages
>>>>>>
>>>>>>> like
>>>>>>>>>
>>>>>>>>>> Python or C where the equivalent of a cross-platform,
>>>>>>>>>>>>>
>>>>>>>>>>>> self-contained
>>>>>>>>>
>>>>>>>>>> jar isn't as easy to produce) and is strictly more powerful
>>>>>>>>>>>>>
>>>>>>>>>>>> and
>>>>>>
>>>>>>> flexible (specifically it isolates the runtime environment
>>>>>>>>>>>>>
>>>>>>>>>>>> and
>>>>>
>>>>>> one
>>>>>>>>
>>>>>>>>> can
>>>>>>>>>>
>>>>>>>>>>> even use it for local testing).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Slicing a worker up like this without sacrificing
>>>>>>>>>>>>>
>>>>>>>>>>>> performance
>>>>>
>>>>>> is an
>>>>>>>>
>>>>>>>>> ambitious goal, but essential to the story of being able to
>>>>>>>>>>>>>
>>>>>>>>>>>> mix
>>>>>>
>>>>>>> and
>>>>>>>>
>>>>>>>>> match runners and SDKs arbitrarily, and I think this is a
>>>>>>>>>>>>>
>>>>>>>>>>>> great
>>>>>>
>>>>>>> start.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
>>>>>>>>>>>>>
>>>>>>>>>>>> <lcwik@google.com.invalid
>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Your correct, a docker container is created that contains
>>>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>>
>>>>>>> execution
>>>>>>>>>>>
>>>>>>>>>>>> environment the user wants or the user re-uses an
>>>>>>>>>>>>>>
>>>>>>>>>>>>> existing
>>>>>
>>>>>> one
>>>>>>
>>>>>>> (allowing
>>>>>>>>>>>>
>>>>>>>>>>>>> for a user to embed all their code/dependencies or use a
>>>>>>>>>>>>>>
>>>>>>>>>>>>> container
>>>>>>>>>
>>>>>>>>>> that
>>>>>>>>>>>
>>>>>>>>>>>> can
>>>>>>>>>>>>>
>>>>>>>>>>>>>> deploy code/dependencies on demand).
>>>>>>>>>>>>>> A user creates a pipeline saying which docker container
>>>>>>>>>>>>>>
>>>>>>>>>>>>> they
>>>>>
>>>>>> want
>>>>>>>>
>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>>>> use
>>>>>>>>>>>>
>>>>>>>>>>>>> (this starts to allow for multiple container definitions
>>>>>>>>>>>>>>
>>>>>>>>>>>>> within a
>>>>>>>>
>>>>>>>>> single
>>>>>>>>>>>>
>>>>>>>>>>>>> pipeline to support multiple languages, versioning, ...).
>>>>>>>>>>>>>> A runner would then be responsible for launching one or
>>>>>>>>>>>>>>
>>>>>>>>>>>>> more
>>>>>
>>>>>> of
>>>>>>>>
>>>>>>>>> these
>>>>>>>>>>
>>>>>>>>>>> containers in a cluster manager of their choice (scaling
>>>>>>>>>>>>>>
>>>>>>>>>>>>> up
>>>>>
>>>>>> or
>>>>>>
>>>>>>> down
>>>>>>>>>
>>>>>>>>>> the
>>>>>>>>>>>
>>>>>>>>>>>> number of instances depending on demand/load/...).
>>>>>>>>>>>>>> A runner then interacts with the docker containers over
>>>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>
>>>>>> gRPC
>>>>>>>>
>>>>>>>>> service
>>>>>>>>>>>>
>>>>>>>>>>>>> definitions to delegate processing to.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
>>>>>>>>>>>>>>
>>>>>>>>>>>>> jb@nanthrax.net
>>>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Luke,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> that's really great and very promising !
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It's really ambitious but I like the idea. Just to
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> clarify:
>>>>>
>>>>>> the
>>>>>>>>
>>>>>>>>> purpose
>>>>>>>>>>>>
>>>>>>>>>>>>> of
>>>>>>>>>>>>>
>>>>>>>>>>>>>> using gRPC is once the docker container is running,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> then we
>>>>>
>>>>>> can
>>>>>>>>
>>>>>>>>> "interact"
>>>>>>>>>>>>>
>>>>>>>>>>>>>> with the container to spread and delegate processing to
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>
>>>>>> docker
>>>>>>>>>
>>>>>>>>>> container, correct ?
>>>>>>>>>>>>>>> The users/devops have to setup the docker containers as
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> prerequisite.
>>>>>>>>>>>
>>>>>>>>>>>> Then, the "location" of the containers (kind of
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> container
>>>>>
>>>>>> registry)
>>>>>>>>>>
>>>>>>>>>>> is
>>>>>>>>>>>
>>>>>>>>>>>> set
>>>>>>>>>>>>>
>>>>>>>>>>>>>> via the pipeline options and used by gRPC ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks Luke !
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have been prototyping several components towards the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Beam
>>>>>>
>>>>>>> technical
>>>>>>>>>>>
>>>>>>>>>>>> vision of being able to execute an arbitrary language
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> using
>>>>>>
>>>>>>> an
>>>>>>>>
>>>>>>>>> arbitrary
>>>>>>>>>>>>>
>>>>>>>>>>>>>> runner.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I would like to share this overview [1] of what I have
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> been
>>>>>>
>>>>>>> working
>>>>>>>>>>
>>>>>>>>>>> towards. I also share this PR [2] with a proposed API,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> service
>>>>>>>>
>>>>>>>>> definitions
>>>>>>>>>>>>>
>>>>>>>>>>>>>> and partial implementation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1: https://s.apache.org/beam-fn-api
>>>>>>>>>>>>>>>> 2: https://github.com/apache/beam/pull/1801
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Please comment on the overview within this thread, and
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> any
>>>>>
>>>>>> specific
>>>>>>>>>>
>>>>>>>>>>> code
>>>>>>>>>>>>>
>>>>>>>>>>>>>> comments on the PR directly.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Luke
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>>>>> jbonofre@apache.org
>>>>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>
>>>>
>

Re: Beam Fn API

Posted by Etienne Chauchot <ec...@gmail.com>.
Thanks for all these docs! They are exactly what was needed for new 
contributors as discussed in this thread

https://lists.apache.org/thread.html/ac93d29424e19d57097373b78f3f5bcbc701e4b51385a52a6e27b7ed@%3Cdev.beam.apache.org%3E

Etienne

Le 31/05/2017 à 11:12, Aljoscha Krettek a écrit :
> Thanks for banging these out Lukasz. I’ll try and read them all this week.
>
> We’re also planning to add support for the Fn API to the Flink Runner so that we can execute Python programs. I’m sure we’ll get some valuable feedback for you while doing that.
>
>> On 26. May 2017, at 22:49, Lukasz Cwik <lc...@google.com.INVALID> wrote:
>>
>> I would like to share another document about the Fn API. This document
>> specifically discusses how to access side inputs, access remote references
>> (e.g. large iterables for hot keys produced by a GBK), and support user
>> state.
>> https://s.apache.org/beam-fn-state-api-and-bundle-processing
>>
>> The document does require a strong foundation in the Apache Beam model and
>> a good understanding of the prior shared docs:
>> * How to process a bundle: https://s.apache.org/beam-fn-api
>> -processing-a-bundle
>> * How to send and receive data: https://s.apache.org/beam-fn-api
>> -send-and-receive-data
>>
>> I could really use the help of runner contributors to review the caching
>> semantics within the SDK harness and whether they would work well for the
>> runner they contribute to the most.
>>
>> On Sun, May 21, 2017 at 6:40 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Manu, the goal is to share here initially, update the docs addressing
>>> people's comments, and then publish them on the website once they are
>>> stable enough.
>>>
>>> On Sun, May 21, 2017 at 5:54 PM, Manu Zhang <ow...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Lukasz. The following two links were somehow incorrectly formatted
>>>> in your mail.
>>>>
>>>> * How to process a bundle:
>>>> https://s.apache.org/beam-fn-api-processing-a-bundle
>>>> * How to send and receive data:
>>>> https://s.apache.org/beam-fn-api-send-and-receive-data
>>>>
>>>> By the way, is there a way to find them from the Beam website ?
>>>>
>>>>
>>>> On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
>>>> wrote:
>>>>
>>>>> Now that I'm back from vacation and the 2.0.0 release is not taking all
>>>> my
>>>>> time, I am focusing my attention on working on the Beam Portability
>>>>> framework, specifically the Fn API so that we can get Python and other
>>>>> language integrations work with any runner.
>>>>>
>>>>> For new comers, I would like to reshare the overview:
>>>>> https://s.apache.org/beam-fn-api
>>>>>
>>>>> And for those of you who have been following this thread and
>>>> contributors
>>>>> focusing on Runner integration with Apache Beam:
>>>>> * How to process a bundle: https://s.apache.org/beam-fn-a
>>>> pi-processing-a-
>>>>> bundle
>>>>> * How to send and receive data: https://s.apache.org/
>>>>> beam-fn-api-send-and-receive-data
>>>>>
>>>>> If you want to dive deeper, you should look at:
>>>>> * Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>>>>> common/runner-api/src/main/proto/beam_runner_api.proto
>>>>> <https://github.com/apache/beam/blob/master/sdks/common/runn
>>>> er-api/src/main/proto/beam_runner_api.proto>
>>>>> * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>>>>> common/fn-api/src/main/proto/beam_fn_api.proto
>>>>> <https://github.com/apache/beam/blob/master/sdks/common/fn-
>>>> api/src/main/proto/beam_fn_api.proto>
>>>>> * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>>>>> java/harness
>>>>> <https://github.com/apache/beam/tree/master/sdks/java/harness>
>>>>> * Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>>>>> python/apache_beam/runners/worker
>>>>> <https://github.com/apache/beam/tree/master/sdks/python/apac
>>>> he_beam/runners/worker>
>>>>> Next I'm planning on talking about Beam Fn State API and will need help
>>>>> from Runner contributors to talk about caching semantics and key spaces
>>>> and
>>>>> whether the integrations mesh well with current Runner implementations.
>>>> The
>>>>> State API is meant to support user state, side inputs, and re-iteration
>>>> for
>>>>> large values produced by GroupByKey.
>>>>>
>>>>> On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Yes, I was using a Pipeline that was:
>>>>>> Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a
>>>> batch
>>>>>> pipeline in the global window using the default trigger)
>>>>>>
>>>>>> In Google Cloud Dataflow, the shuffle step uses the binary
>>>> representation
>>>>>> to compare keys, so the above pipeline would normally be converted to
>>>> the
>>>>>> following two stages:
>>>>>> Read -> GBK Writer
>>>>>> GBK Reader -> IdentityParDo
>>>>>>
>>>>>> Note that the GBK Writer and GBK Reader need to use a coder to encode
>>>> and
>>>>>> decode the value.
>>>>>>
>>>>>> When using the Fn API, those two stages expanded because of the Fn Api
>>>>>> crossings using a gRPC Write/Read pair:
>>>>>> Read -> gRPC Write -> gRPC Read -> GBK Writer
>>>>>> GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
>>>>>>
>>>>>> In my naive prototype implementation, the coder was used to encode
>>>>>> elements at the gRPC steps. This meant that the coder was
>>>>>> encoding/decoding/encoding in the first stage and
>>>>>> decoding/encoding/decoding in the second stage. This tripled the
>>>> amount
>>>>> of
>>>>>> times the coder was being invoked per element. This additional use of
>>>> the
>>>>>> coder accounted for ~12% (80% of the 15%) of the extra execution time.
>>>>> This
>>>>>> implementation is quite inefficient and would benefit from merging the
>>>>> gRPC
>>>>>> Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
>>>>> into
>>>>>> another actor allowing for the creation of a fast path that can skip
>>>>> parts
>>>>>> of the decode/encode cycle through the coder. By using a byte array
>>>> view
>>>>>> over the logical stream, one can minimize the number of byte array
>>>> copies
>>>>>> which plagued my naive implementation. This can be done by only
>>>> parsing
>>>>> the
>>>>>> element boundaries out of the stream to produce those logical byte
>>>> array
>>>>>> views. I have a very rough estimate that performing this optimization
>>>>> would
>>>>>> reduce the 12% overhead to somewhere between 4% and 6%.
>>>>>>
>>>>>> The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
>>>>>> marshalling/unmarshalling protos
>>>>>> handling/managing the socket
>>>>>> flow control
>>>>>> ...
>>>>>>
>>>>>> Finally, I did try experiments with different buffer sizes (10KB,
>>>> 100KB,
>>>>>> 1000KB), flow control (separate thread[1] vs same thread with
>>>> phaser[2]),
>>>>>> and channel type [3] (NIO, epoll, domain socket), but coder overhead
>>>>> easily
>>>>>> dominated the differences in these other experiments.
>>>>>>
>>>>>> Further analysis would need to be done to more accurately distill this
>>>>>> down.
>>>>>>
>>>>>> 1: https://github.com/lukecwik/incubator-beam/blob/
>>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>>> rness/stream/
>>>>>> BufferingStreamObserver.java
>>>>>> 2: https://github.com/lukecwik/incubator-beam/blob/
>>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>>> rness/stream/
>>>>>> DirectStreamObserver.java
>>>>>> 3: https://github.com/lukecwik/incubator-beam/blob/
>>>>>>
>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>>> rness/channel/
>>>>>> ManagedChannelFactory.java
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com>
>>>> wrote:
>>>>>>> Awesome job Lukasz, Excellent, I have to confess the first time I
>>>> heard
>>>>>>> about
>>>>>>> the Fn API idea I was a bit incredulous, but you are making it real,
>>>>>>> amazing!
>>>>>>>
>>>>>>> Just one question from your document, you said that 80% of the extra
>>>>> (15%)
>>>>>>> time
>>>>>>> goes into encoding and decoding the data for your test case, can you
>>>>>>> expand
>>>>>>> in
>>>>>>> your current ideas to improve this? (I am not sure I completely
>>>>> understand
>>>>>>> the
>>>>>>> issue).
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik
>>>> <lc...@google.com.invalid>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Responded inline.
>>>>>>>>
>>>>>>>> On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
>>>>>>> wrote:
>>>>>>>>> This is truly amazing Luke!
>>>>>>>>>
>>>>>>>>> If I understand this right, the runner executing the DoFn will
>>>>>>> delegate
>>>>>>>> the
>>>>>>>>> function code and input data (and state, coders, etc.) to the
>>>>>>> container
>>>>>>>>> where it will execute with the user's SDK of choice, right ?
>>>>>>>>
>>>>>>>> Yes, that is correct.
>>>>>>>>
>>>>>>>>
>>>>>>>>> I wonder how the containers relate to the underlying engine's
>>>> worker
>>>>>>>>> processes ? is it a 1-1, container per worker ? if there's less
>>>>> "work"
>>>>>>>> for
>>>>>>>>> the worker's Java process (for example) now and it becomes a
>>>> sort of
>>>>>>>>> "dispatcher", would that change the resource allocation commonly
>>>>> used
>>>>>>> for
>>>>>>>>> the same Pipeline so that the worker processes would require less
>>>>>>>>> resources, while giving those to the container ?
>>>>>>>>>
>>>>>>>> I think with the four services (control, data, state, logging) you
>>>> can
>>>>>>> go
>>>>>>>> with a 1-1 relationship or break it up more finely grained and
>>>>> dedicate
>>>>>>>> some machines to have specific tasks. Like you could have a few
>>>>> machines
>>>>>>>> dedicated to log aggregation which all the workers push their logs
>>>> to.
>>>>>>>> Similarly, you could have some machines that have a lot of memory
>>>>> which
>>>>>>>> would be better to be able to do shuffles in memory and then this
>>>>>>> cluster
>>>>>>>> of high memory machines could front the data service. I believe
>>>> there
>>>>>>> is a
>>>>>>>> lot of flexibility based upon what a runner can do and what it
>>>>>>> specializes
>>>>>>>> in and believe that with more effort comes more possibilities
>>>> albeit
>>>>>>> with
>>>>>>>> increased internal complexity.
>>>>>>>>
>>>>>>>> The layout of resources depends on whether the services and SDK
>>>>>>> containers
>>>>>>>> are co-hosted on the same machine or whether there is a different
>>>>>>>> architecture in play. In a co-hosted configuration, it seems likely
>>>>> that
>>>>>>>> the SDK container will get more resources but is dependent on the
>>>>> runner
>>>>>>>> and pipeline shape (shuffle heavy dominated pipelines will look
>>>>>>> different
>>>>>>>> then ParDo dominated pipelines).
>>>>>>>>
>>>>>>>>
>>>>>>>>> About executing sub-graphs, would it be true to say that as long
>>>> as
>>>>>>>> there's
>>>>>>>>> no shuffle, you could keep executing in the same container ?
>>>> meaning
>>>>>>> that
>>>>>>>>> the graph is broken into sub-graphs by shuffles ?
>>>>>>>>>
>>>>>>>> The only thing that is required is that the Apache Beam model is
>>>>>>> preserved
>>>>>>>> so typical break points will be at shuffles and language crossing
>>>>> points
>>>>>>>> (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
>>>>>>> graph
>>>>>>>> even more for other reasons.
>>>>>>>>
>>>>>>>>
>>>>>>>>> I have to dig-in deeper, so I could have more questions ;-)
>>>> thanks
>>>>>>> Luke!
>>>>>>>>> On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
>>>>> <lcwik@google.com.invalid
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I updated the PR description to contain the same.
>>>>>>>>>>
>>>>>>>>>> I would start by looking at the API/object model definitions
>>>> found
>>>>>>> in
>>>>>>>>>> beam_fn_api.proto
>>>>>>>>>> <
>>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>>> sdks/common/fn-api/src/main/proto/beam_fn_api.proto
>>>>>>>>>> Then depending on your interest, look at the following:
>>>>>>>>>> * FnHarness.java
>>>>>>>>>> <
>>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/
>>>>>>>> harness/FnHarness.java
>>>>>>>>>> is the main entry point.
>>>>>>>>>> * org.apache.beam.fn.harness.data
>>>>>>>>>> <
>>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
>>>>>>>>>> contains the most interesting bits of code since it is able to
>>>>>>>> multiplex
>>>>>>>>> a
>>>>>>>>>> gRPC stream into multiple logical streams of elements bound for
>>>>>>>> multiple
>>>>>>>>>> concurrent process bundle requests. It also contains the code
>>>> to
>>>>>>> take
>>>>>>>>>> multiple logical outbound streams and multiplex them back onto
>>>> a
>>>>>>> gRPC
>>>>>>>>>> stream.
>>>>>>>>>> * org.apache.beam.runners.core
>>>>>>>>>> <
>>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/runners/core
>>>>>>>>>> contains additional runners akin to the DoFnRunner found in
>>>>>>>> runners-core
>>>>>>>>> to
>>>>>>>>>> support sources and gRPC endpoints.
>>>>>>>>>>
>>>>>>>>>> Unless your really interested in how domain sockets, epoll, nio
>>>>>>> channel
>>>>>>>>>> factories or how stream readiness callbacks work in gRPC, I
>>>> would
>>>>>>> avoid
>>>>>>>>> the
>>>>>>>>>> packages org.apache.beam.fn.harness.channel and
>>>>>>>>>> org.apache.beam.fn.harness.stream. Similarly I would avoid
>>>>>>>>>> org.apache.beam.fn.harness.fn and
>>>> org.apache.beam.fn.harness.fake
>>>>>>> as
>>>>>>>>> they
>>>>>>>>>> don't add anything meaningful to the api.
>>>>>>>>>>
>>>>>>>>>> Code package descriptions:
>>>>>>>>>>
>>>>>>>>>> org.apache.beam.fn.harness.FnHarness: main entry point
>>>>>>>>>> org.apache.beam.fn.harness.control: Control service client and
>>>>>>>>> individual
>>>>>>>>>> request handlers
>>>>>>>>>> org.apache.beam.fn.harness.data: Data service client and
>>>> logical
>>>>>>>> stream
>>>>>>>>>> multiplexing
>>>>>>>>>> org.apache.beam.runners.core: Additional runners akin to the
>>>>>>> DoFnRunner
>>>>>>>>>> found in runners-core to support sources and gRPC endpoints
>>>>>>>>>> org.apache.beam.fn.harness.logging: Logging client
>>>> implementation
>>>>>>> and
>>>>>>>>> JUL
>>>>>>>>>> logging handler adapter
>>>>>>>>>> org.apache.beam.fn.harness.channel: gRPC channel management
>>>>>>>>>> org.apache.beam.fn.harness.stream: gRPC stream management
>>>>>>>>>> org.apache.beam.fn.harness.fn: Java 8 functional interface
>>>>>>> extensions
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
>>>>>>>> <klk@google.com.invalid
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> This is awesome! Any chance you could roadmap the PR for us
>>>> with
>>>>>>> some
>>>>>>>>>> links
>>>>>>>>>>> into the most interesting bits?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
>>>>>>>>>>> robertwb@google.com.invalid> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Also, note that we can still support the "simple" case. For
>>>>>>>> example,
>>>>>>>>>>>> if the user supplies us with a jar file (as they do now) a
>>>>>>> runner
>>>>>>>>>>>> could launch it as a subprocesses and communicate with it
>>>> via
>>>>>>> this
>>>>>>>>>>>> same Fn API or install it in a fixed container itself--the
>>>>> user
>>>>>>>>>>>> doesn't *need* to know about docker or manually manage
>>>>>>> containers
>>>>>>>>> (and
>>>>>>>>>>>> indeed the Fn API could be used in-process, cross-process,
>>>>>>>>>>>> cross-container, and even cross-machine).
>>>>>>>>>>>>
>>>>>>>>>>>> However docker provides a nice cross-language way of
>>>>> specifying
>>>>>>> the
>>>>>>>>>>>> environment including all dependencies (especially for
>>>>> languages
>>>>>>>> like
>>>>>>>>>>>> Python or C where the equivalent of a cross-platform,
>>>>>>>> self-contained
>>>>>>>>>>>> jar isn't as easy to produce) and is strictly more powerful
>>>>> and
>>>>>>>>>>>> flexible (specifically it isolates the runtime environment
>>>> and
>>>>>>> one
>>>>>>>>> can
>>>>>>>>>>>> even use it for local testing).
>>>>>>>>>>>>
>>>>>>>>>>>> Slicing a worker up like this without sacrificing
>>>> performance
>>>>>>> is an
>>>>>>>>>>>> ambitious goal, but essential to the story of being able to
>>>>> mix
>>>>>>> and
>>>>>>>>>>>> match runners and SDKs arbitrarily, and I think this is a
>>>>> great
>>>>>>>>> start.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
>>>>>>>>> <lcwik@google.com.invalid
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Your correct, a docker container is created that contains
>>>>> the
>>>>>>>>>> execution
>>>>>>>>>>>>> environment the user wants or the user re-uses an
>>>> existing
>>>>> one
>>>>>>>>>>> (allowing
>>>>>>>>>>>>> for a user to embed all their code/dependencies or use a
>>>>>>>> container
>>>>>>>>>> that
>>>>>>>>>>>> can
>>>>>>>>>>>>> deploy code/dependencies on demand).
>>>>>>>>>>>>> A user creates a pipeline saying which docker container
>>>> they
>>>>>>> want
>>>>>>>>> to
>>>>>>>>>>> use
>>>>>>>>>>>>> (this starts to allow for multiple container definitions
>>>>>>> within a
>>>>>>>>>>> single
>>>>>>>>>>>>> pipeline to support multiple languages, versioning, ...).
>>>>>>>>>>>>> A runner would then be responsible for launching one or
>>>> more
>>>>>>> of
>>>>>>>>> these
>>>>>>>>>>>>> containers in a cluster manager of their choice (scaling
>>>> up
>>>>> or
>>>>>>>> down
>>>>>>>>>> the
>>>>>>>>>>>>> number of instances depending on demand/load/...).
>>>>>>>>>>>>> A runner then interacts with the docker containers over
>>>> the
>>>>>>> gRPC
>>>>>>>>>>> service
>>>>>>>>>>>>> definitions to delegate processing to.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
>>>>>>>>>> jb@nanthrax.net
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Luke,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> that's really great and very promising !
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It's really ambitious but I like the idea. Just to
>>>> clarify:
>>>>>>> the
>>>>>>>>>>> purpose
>>>>>>>>>>>> of
>>>>>>>>>>>>>> using gRPC is once the docker container is running,
>>>> then we
>>>>>>> can
>>>>>>>>>>>> "interact"
>>>>>>>>>>>>>> with the container to spread and delegate processing to
>>>> the
>>>>>>>> docker
>>>>>>>>>>>>>> container, correct ?
>>>>>>>>>>>>>> The users/devops have to setup the docker containers as
>>>>>>>>>> prerequisite.
>>>>>>>>>>>>>> Then, the "location" of the containers (kind of
>>>> container
>>>>>>>>> registry)
>>>>>>>>>> is
>>>>>>>>>>>> set
>>>>>>>>>>>>>> via the pipeline options and used by gRPC ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks Luke !
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have been prototyping several components towards the
>>>>> Beam
>>>>>>>>>> technical
>>>>>>>>>>>>>>> vision of being able to execute an arbitrary language
>>>>> using
>>>>>>> an
>>>>>>>>>>>> arbitrary
>>>>>>>>>>>>>>> runner.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I would like to share this overview [1] of what I have
>>>>> been
>>>>>>>>> working
>>>>>>>>>>>>>>> towards. I also share this PR [2] with a proposed API,
>>>>>>> service
>>>>>>>>>>>> definitions
>>>>>>>>>>>>>>> and partial implementation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1: https://s.apache.org/beam-fn-api
>>>>>>>>>>>>>>> 2: https://github.com/apache/beam/pull/1801
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Please comment on the overview within this thread, and
>>>> any
>>>>>>>>> specific
>>>>>>>>>>>> code
>>>>>>>>>>>>>>> comments on the PR directly.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Luke
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>>>> jbonofre@apache.org
>>>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>>>>>
>>>>>>
>>>


Re: Beam Fn API

Posted by Aljoscha Krettek <al...@apache.org>.
Thanks for banging these out Lukasz. I’ll try and read them all this week.

We’re also planning to add support for the Fn API to the Flink Runner so that we can execute Python programs. I’m sure we’ll get some valuable feedback for you while doing that.

> On 26. May 2017, at 22:49, Lukasz Cwik <lc...@google.com.INVALID> wrote:
> 
> I would like to share another document about the Fn API. This document
> specifically discusses how to access side inputs, access remote references
> (e.g. large iterables for hot keys produced by a GBK), and support user
> state.
> https://s.apache.org/beam-fn-state-api-and-bundle-processing
> 
> The document does require a strong foundation in the Apache Beam model and
> a good understanding of the prior shared docs:
> * How to process a bundle: https://s.apache.org/beam-fn-api
> -processing-a-bundle
> * How to send and receive data: https://s.apache.org/beam-fn-api
> -send-and-receive-data
> 
> I could really use the help of runner contributors to review the caching
> semantics within the SDK harness and whether they would work well for the
> runner they contribute to the most.
> 
> On Sun, May 21, 2017 at 6:40 PM, Lukasz Cwik <lc...@google.com> wrote:
> 
>> Manu, the goal is to share here initially, update the docs addressing
>> people's comments, and then publish them on the website once they are
>> stable enough.
>> 
>> On Sun, May 21, 2017 at 5:54 PM, Manu Zhang <ow...@gmail.com>
>> wrote:
>> 
>>> Thanks Lukasz. The following two links were somehow incorrectly formatted
>>> in your mail.
>>> 
>>> * How to process a bundle:
>>> https://s.apache.org/beam-fn-api-processing-a-bundle
>>> * How to send and receive data:
>>> https://s.apache.org/beam-fn-api-send-and-receive-data
>>> 
>>> By the way, is there a way to find them from the Beam website ?
>>> 
>>> 
>>> On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
>>> wrote:
>>> 
>>>> Now that I'm back from vacation and the 2.0.0 release is not taking all
>>> my
>>>> time, I am focusing my attention on working on the Beam Portability
>>>> framework, specifically the Fn API so that we can get Python and other
>>>> language integrations work with any runner.
>>>> 
>>>> For new comers, I would like to reshare the overview:
>>>> https://s.apache.org/beam-fn-api
>>>> 
>>>> And for those of you who have been following this thread and
>>> contributors
>>>> focusing on Runner integration with Apache Beam:
>>>> * How to process a bundle: https://s.apache.org/beam-fn-a
>>> pi-processing-a-
>>>> bundle
>>>> * How to send and receive data: https://s.apache.org/
>>>> beam-fn-api-send-and-receive-data
>>>> 
>>>> If you want to dive deeper, you should look at:
>>>> * Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>>>> common/runner-api/src/main/proto/beam_runner_api.proto
>>>> <https://github.com/apache/beam/blob/master/sdks/common/runn
>>> er-api/src/main/proto/beam_runner_api.proto>
>>>> * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>>>> common/fn-api/src/main/proto/beam_fn_api.proto
>>>> <https://github.com/apache/beam/blob/master/sdks/common/fn-
>>> api/src/main/proto/beam_fn_api.proto>
>>>> * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>>>> java/harness
>>>> <https://github.com/apache/beam/tree/master/sdks/java/harness>
>>>> * Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>>>> python/apache_beam/runners/worker
>>>> <https://github.com/apache/beam/tree/master/sdks/python/apac
>>> he_beam/runners/worker>
>>>> 
>>>> Next I'm planning on talking about Beam Fn State API and will need help
>>>> from Runner contributors to talk about caching semantics and key spaces
>>> and
>>>> whether the integrations mesh well with current Runner implementations.
>>> The
>>>> State API is meant to support user state, side inputs, and re-iteration
>>> for
>>>> large values produced by GroupByKey.
>>>> 
>>>> On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>> 
>>>>> Yes, I was using a Pipeline that was:
>>>>> Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a
>>> batch
>>>>> pipeline in the global window using the default trigger)
>>>>> 
>>>>> In Google Cloud Dataflow, the shuffle step uses the binary
>>> representation
>>>>> to compare keys, so the above pipeline would normally be converted to
>>> the
>>>>> following two stages:
>>>>> Read -> GBK Writer
>>>>> GBK Reader -> IdentityParDo
>>>>> 
>>>>> Note that the GBK Writer and GBK Reader need to use a coder to encode
>>> and
>>>>> decode the value.
>>>>> 
>>>>> When using the Fn API, those two stages expanded because of the Fn Api
>>>>> crossings using a gRPC Write/Read pair:
>>>>> Read -> gRPC Write -> gRPC Read -> GBK Writer
>>>>> GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
>>>>> 
>>>>> In my naive prototype implementation, the coder was used to encode
>>>>> elements at the gRPC steps. This meant that the coder was
>>>>> encoding/decoding/encoding in the first stage and
>>>>> decoding/encoding/decoding in the second stage. This tripled the
>>> amount
>>>> of
>>>>> times the coder was being invoked per element. This additional use of
>>> the
>>>>> coder accounted for ~12% (80% of the 15%) of the extra execution time.
>>>> This
>>>>> implementation is quite inefficient and would benefit from merging the
>>>> gRPC
>>>>> Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
>>>> into
>>>>> another actor allowing for the creation of a fast path that can skip
>>>> parts
>>>>> of the decode/encode cycle through the coder. By using a byte array
>>> view
>>>>> over the logical stream, one can minimize the number of byte array
>>> copies
>>>>> which plagued my naive implementation. This can be done by only
>>> parsing
>>>> the
>>>>> element boundaries out of the stream to produce those logical byte
>>> array
>>>>> views. I have a very rough estimate that performing this optimization
>>>> would
>>>>> reduce the 12% overhead to somewhere between 4% and 6%.
>>>>> 
>>>>> The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
>>>>> marshalling/unmarshalling protos
>>>>> handling/managing the socket
>>>>> flow control
>>>>> ...
>>>>> 
>>>>> Finally, I did try experiments with different buffer sizes (10KB,
>>> 100KB,
>>>>> 1000KB), flow control (separate thread[1] vs same thread with
>>> phaser[2]),
>>>>> and channel type [3] (NIO, epoll, domain socket), but coder overhead
>>>> easily
>>>>> dominated the differences in these other experiments.
>>>>> 
>>>>> Further analysis would need to be done to more accurately distill this
>>>>> down.
>>>>> 
>>>>> 1: https://github.com/lukecwik/incubator-beam/blob/
>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>> rness/stream/
>>>>> BufferingStreamObserver.java
>>>>> 2: https://github.com/lukecwik/incubator-beam/blob/
>>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>> rness/stream/
>>>>> DirectStreamObserver.java
>>>>> 3: https://github.com/lukecwik/incubator-beam/blob/
>>>>> 
>>>> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>>> rness/channel/
>>>>> ManagedChannelFactory.java
>>>>> 
>>>>> 
>>>>> On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com>
>>> wrote:
>>>>> 
>>>>>> Awesome job Lukasz, Excellent, I have to confess the first time I
>>> heard
>>>>>> about
>>>>>> the Fn API idea I was a bit incredulous, but you are making it real,
>>>>>> amazing!
>>>>>> 
>>>>>> Just one question from your document, you said that 80% of the extra
>>>> (15%)
>>>>>> time
>>>>>> goes into encoding and decoding the data for your test case, can you
>>>>>> expand
>>>>>> in
>>>>>> your current ideas to improve this? (I am not sure I completely
>>>> understand
>>>>>> the
>>>>>> issue).
>>>>>> 
>>>>>> 
>>>>>> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik
>>> <lc...@google.com.invalid>
>>>>>> wrote:
>>>>>> 
>>>>>>> Responded inline.
>>>>>>> 
>>>>>>> On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> This is truly amazing Luke!
>>>>>>>> 
>>>>>>>> If I understand this right, the runner executing the DoFn will
>>>>>> delegate
>>>>>>> the
>>>>>>>> function code and input data (and state, coders, etc.) to the
>>>>>> container
>>>>>>>> where it will execute with the user's SDK of choice, right ?
>>>>>>> 
>>>>>>> 
>>>>>>> Yes, that is correct.
>>>>>>> 
>>>>>>> 
>>>>>>>> I wonder how the containers relate to the underlying engine's
>>> worker
>>>>>>>> processes ? is it a 1-1, container per worker ? if there's less
>>>> "work"
>>>>>>> for
>>>>>>>> the worker's Java process (for example) now and it becomes a
>>> sort of
>>>>>>>> "dispatcher", would that change the resource allocation commonly
>>>> used
>>>>>> for
>>>>>>>> the same Pipeline so that the worker processes would require less
>>>>>>>> resources, while giving those to the container ?
>>>>>>>> 
>>>>>>> 
>>>>>>> I think with the four services (control, data, state, logging) you
>>> can
>>>>>> go
>>>>>>> with a 1-1 relationship or break it up more finely grained and
>>>> dedicate
>>>>>>> some machines to have specific tasks. Like you could have a few
>>>> machines
>>>>>>> dedicated to log aggregation which all the workers push their logs
>>> to.
>>>>>>> Similarly, you could have some machines that have a lot of memory
>>>> which
>>>>>>> would be better to be able to do shuffles in memory and then this
>>>>>> cluster
>>>>>>> of high memory machines could front the data service. I believe
>>> there
>>>>>> is a
>>>>>>> lot of flexibility based upon what a runner can do and what it
>>>>>> specializes
>>>>>>> in and believe that with more effort comes more possibilities
>>> albeit
>>>>>> with
>>>>>>> increased internal complexity.
>>>>>>> 
>>>>>>> The layout of resources depends on whether the services and SDK
>>>>>> containers
>>>>>>> are co-hosted on the same machine or whether there is a different
>>>>>>> architecture in play. In a co-hosted configuration, it seems likely
>>>> that
>>>>>>> the SDK container will get more resources but is dependent on the
>>>> runner
>>>>>>> and pipeline shape (shuffle heavy dominated pipelines will look
>>>>>> different
>>>>>>> then ParDo dominated pipelines).
>>>>>>> 
>>>>>>> 
>>>>>>>> About executing sub-graphs, would it be true to say that as long
>>> as
>>>>>>> there's
>>>>>>>> no shuffle, you could keep executing in the same container ?
>>> meaning
>>>>>> that
>>>>>>>> the graph is broken into sub-graphs by shuffles ?
>>>>>>>> 
>>>>>>> 
>>>>>>> The only thing that is required is that the Apache Beam model is
>>>>>> preserved
>>>>>>> so typical break points will be at shuffles and language crossing
>>>> points
>>>>>>> (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
>>>>>> graph
>>>>>>> even more for other reasons.
>>>>>>> 
>>>>>>> 
>>>>>>>> I have to dig-in deeper, so I could have more questions ;-)
>>> thanks
>>>>>> Luke!
>>>>>>>> 
>>>>>>>> On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
>>>> <lcwik@google.com.invalid
>>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> I updated the PR description to contain the same.
>>>>>>>>> 
>>>>>>>>> I would start by looking at the API/object model definitions
>>> found
>>>>>> in
>>>>>>>>> beam_fn_api.proto
>>>>>>>>> <
>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>> sdks/common/fn-api/src/main/proto/beam_fn_api.proto
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Then depending on your interest, look at the following:
>>>>>>>>> * FnHarness.java
>>>>>>>>> <
>>>>>>>>> https://github.com/lukecwik/incubator-beam/blob/fn_api/
>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/
>>>>>>> harness/FnHarness.java
>>>>>>>>>> 
>>>>>>>>> is the main entry point.
>>>>>>>>> * org.apache.beam.fn.harness.data
>>>>>>>>> <
>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
>>>>>>>>>> 
>>>>>>>>> contains the most interesting bits of code since it is able to
>>>>>>> multiplex
>>>>>>>> a
>>>>>>>>> gRPC stream into multiple logical streams of elements bound for
>>>>>>> multiple
>>>>>>>>> concurrent process bundle requests. It also contains the code
>>> to
>>>>>> take
>>>>>>>>> multiple logical outbound streams and multiplex them back onto
>>> a
>>>>>> gRPC
>>>>>>>>> stream.
>>>>>>>>> * org.apache.beam.runners.core
>>>>>>>>> <
>>>>>>>>> https://github.com/lukecwik/incubator-beam/tree/fn_api/
>>>>>>>> sdks/java/harness/src/main/java/org/apache/beam/runners/core
>>>>>>>>>> 
>>>>>>>>> contains additional runners akin to the DoFnRunner found in
>>>>>>> runners-core
>>>>>>>> to
>>>>>>>>> support sources and gRPC endpoints.
>>>>>>>>> 
>>>>>>>>> Unless your really interested in how domain sockets, epoll, nio
>>>>>> channel
>>>>>>>>> factories or how stream readiness callbacks work in gRPC, I
>>> would
>>>>>> avoid
>>>>>>>> the
>>>>>>>>> packages org.apache.beam.fn.harness.channel and
>>>>>>>>> org.apache.beam.fn.harness.stream. Similarly I would avoid
>>>>>>>>> org.apache.beam.fn.harness.fn and
>>> org.apache.beam.fn.harness.fake
>>>>>> as
>>>>>>>> they
>>>>>>>>> don't add anything meaningful to the api.
>>>>>>>>> 
>>>>>>>>> Code package descriptions:
>>>>>>>>> 
>>>>>>>>> org.apache.beam.fn.harness.FnHarness: main entry point
>>>>>>>>> org.apache.beam.fn.harness.control: Control service client and
>>>>>>>> individual
>>>>>>>>> request handlers
>>>>>>>>> org.apache.beam.fn.harness.data: Data service client and
>>> logical
>>>>>>> stream
>>>>>>>>> multiplexing
>>>>>>>>> org.apache.beam.runners.core: Additional runners akin to the
>>>>>> DoFnRunner
>>>>>>>>> found in runners-core to support sources and gRPC endpoints
>>>>>>>>> org.apache.beam.fn.harness.logging: Logging client
>>> implementation
>>>>>> and
>>>>>>>> JUL
>>>>>>>>> logging handler adapter
>>>>>>>>> org.apache.beam.fn.harness.channel: gRPC channel management
>>>>>>>>> org.apache.beam.fn.harness.stream: gRPC stream management
>>>>>>>>> org.apache.beam.fn.harness.fn: Java 8 functional interface
>>>>>> extensions
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
>>>>>>> <klk@google.com.invalid
>>>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> This is awesome! Any chance you could roadmap the PR for us
>>> with
>>>>>> some
>>>>>>>>> links
>>>>>>>>>> into the most interesting bits?
>>>>>>>>>> 
>>>>>>>>>> On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
>>>>>>>>>> robertwb@google.com.invalid> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Also, note that we can still support the "simple" case. For
>>>>>>> example,
>>>>>>>>>>> if the user supplies us with a jar file (as they do now) a
>>>>>> runner
>>>>>>>>>>> could launch it as a subprocesses and communicate with it
>>> via
>>>>>> this
>>>>>>>>>>> same Fn API or install it in a fixed container itself--the
>>>> user
>>>>>>>>>>> doesn't *need* to know about docker or manually manage
>>>>>> containers
>>>>>>>> (and
>>>>>>>>>>> indeed the Fn API could be used in-process, cross-process,
>>>>>>>>>>> cross-container, and even cross-machine).
>>>>>>>>>>> 
>>>>>>>>>>> However docker provides a nice cross-language way of
>>>> specifying
>>>>>> the
>>>>>>>>>>> environment including all dependencies (especially for
>>>> languages
>>>>>>> like
>>>>>>>>>>> Python or C where the equivalent of a cross-platform,
>>>>>>> self-contained
>>>>>>>>>>> jar isn't as easy to produce) and is strictly more powerful
>>>> and
>>>>>>>>>>> flexible (specifically it isolates the runtime environment
>>> and
>>>>>> one
>>>>>>>> can
>>>>>>>>>>> even use it for local testing).
>>>>>>>>>>> 
>>>>>>>>>>> Slicing a worker up like this without sacrificing
>>> performance
>>>>>> is an
>>>>>>>>>>> ambitious goal, but essential to the story of being able to
>>>> mix
>>>>>> and
>>>>>>>>>>> match runners and SDKs arbitrarily, and I think this is a
>>>> great
>>>>>>>> start.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
>>>>>>>> <lcwik@google.com.invalid
>>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>>> Your correct, a docker container is created that contains
>>>> the
>>>>>>>>> execution
>>>>>>>>>>>> environment the user wants or the user re-uses an
>>> existing
>>>> one
>>>>>>>>>> (allowing
>>>>>>>>>>>> for a user to embed all their code/dependencies or use a
>>>>>>> container
>>>>>>>>> that
>>>>>>>>>>> can
>>>>>>>>>>>> deploy code/dependencies on demand).
>>>>>>>>>>>> A user creates a pipeline saying which docker container
>>> they
>>>>>> want
>>>>>>>> to
>>>>>>>>>> use
>>>>>>>>>>>> (this starts to allow for multiple container definitions
>>>>>> within a
>>>>>>>>>> single
>>>>>>>>>>>> pipeline to support multiple languages, versioning, ...).
>>>>>>>>>>>> A runner would then be responsible for launching one or
>>> more
>>>>>> of
>>>>>>>> these
>>>>>>>>>>>> containers in a cluster manager of their choice (scaling
>>> up
>>>> or
>>>>>>> down
>>>>>>>>> the
>>>>>>>>>>>> number of instances depending on demand/load/...).
>>>>>>>>>>>> A runner then interacts with the docker containers over
>>> the
>>>>>> gRPC
>>>>>>>>>> service
>>>>>>>>>>>> definitions to delegate processing to.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
>>>>>>>>> jb@nanthrax.net
>>>>>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Luke,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> that's really great and very promising !
>>>>>>>>>>>>> 
>>>>>>>>>>>>> It's really ambitious but I like the idea. Just to
>>> clarify:
>>>>>> the
>>>>>>>>>> purpose
>>>>>>>>>>> of
>>>>>>>>>>>>> using gRPC is once the docker container is running,
>>> then we
>>>>>> can
>>>>>>>>>>> "interact"
>>>>>>>>>>>>> with the container to spread and delegate processing to
>>> the
>>>>>>> docker
>>>>>>>>>>>>> container, correct ?
>>>>>>>>>>>>> The users/devops have to setup the docker containers as
>>>>>>>>> prerequisite.
>>>>>>>>>>>>> Then, the "location" of the containers (kind of
>>> container
>>>>>>>> registry)
>>>>>>>>> is
>>>>>>>>>>> set
>>>>>>>>>>>>> via the pipeline options and used by gRPC ?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks Luke !
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regards
>>>>>>>>>>>>> JB
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I have been prototyping several components towards the
>>>> Beam
>>>>>>>>> technical
>>>>>>>>>>>>>> vision of being able to execute an arbitrary language
>>>> using
>>>>>> an
>>>>>>>>>>> arbitrary
>>>>>>>>>>>>>> runner.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I would like to share this overview [1] of what I have
>>>> been
>>>>>>>> working
>>>>>>>>>>>>>> towards. I also share this PR [2] with a proposed API,
>>>>>> service
>>>>>>>>>>> definitions
>>>>>>>>>>>>>> and partial implementation.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 1: https://s.apache.org/beam-fn-api
>>>>>>>>>>>>>> 2: https://github.com/apache/beam/pull/1801
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Please comment on the overview within this thread, and
>>> any
>>>>>>>> specific
>>>>>>>>>>> code
>>>>>>>>>>>>>> comments on the PR directly.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Luke
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>>> jbonofre@apache.org
>>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 


Re: Beam Fn API

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
I would like to share another document about the Fn API. This document
specifically discusses how to access side inputs, access remote references
(e.g. large iterables for hot keys produced by a GBK), and support user
state.
https://s.apache.org/beam-fn-state-api-and-bundle-processing

The document does require a strong foundation in the Apache Beam model and
a good understanding of the prior shared docs:
* How to process a bundle: https://s.apache.org/beam-fn-api
-processing-a-bundle
* How to send and receive data: https://s.apache.org/beam-fn-api
-send-and-receive-data

I could really use the help of runner contributors to review the caching
semantics within the SDK harness and whether they would work well for the
runner they contribute to the most.

On Sun, May 21, 2017 at 6:40 PM, Lukasz Cwik <lc...@google.com> wrote:

> Manu, the goal is to share here initially, update the docs addressing
> people's comments, and then publish them on the website once they are
> stable enough.
>
> On Sun, May 21, 2017 at 5:54 PM, Manu Zhang <ow...@gmail.com>
> wrote:
>
>> Thanks Lukasz. The following two links were somehow incorrectly formatted
>> in your mail.
>>
>> * How to process a bundle:
>> https://s.apache.org/beam-fn-api-processing-a-bundle
>> * How to send and receive data:
>> https://s.apache.org/beam-fn-api-send-and-receive-data
>>
>> By the way, is there a way to find them from the Beam website ?
>>
>>
>> On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
>> wrote:
>>
>> > Now that I'm back from vacation and the 2.0.0 release is not taking all
>> my
>> > time, I am focusing my attention on working on the Beam Portability
>> > framework, specifically the Fn API so that we can get Python and other
>> > language integrations work with any runner.
>> >
>> > For new comers, I would like to reshare the overview:
>> > https://s.apache.org/beam-fn-api
>> >
>> > And for those of you who have been following this thread and
>> contributors
>> > focusing on Runner integration with Apache Beam:
>> > * How to process a bundle: https://s.apache.org/beam-fn-a
>> pi-processing-a-
>> > bundle
>> > * How to send and receive data: https://s.apache.org/
>> > beam-fn-api-send-and-receive-data
>> >
>> > If you want to dive deeper, you should look at:
>> > * Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>> > common/runner-api/src/main/proto/beam_runner_api.proto
>> > <https://github.com/apache/beam/blob/master/sdks/common/runn
>> er-api/src/main/proto/beam_runner_api.proto>
>> > * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
>> > common/fn-api/src/main/proto/beam_fn_api.proto
>> > <https://github.com/apache/beam/blob/master/sdks/common/fn-
>> api/src/main/proto/beam_fn_api.proto>
>> > * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>> > java/harness
>> > <https://github.com/apache/beam/tree/master/sdks/java/harness>
>> > * Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
>> > python/apache_beam/runners/worker
>> > <https://github.com/apache/beam/tree/master/sdks/python/apac
>> he_beam/runners/worker>
>> >
>> > Next I'm planning on talking about Beam Fn State API and will need help
>> > from Runner contributors to talk about caching semantics and key spaces
>> and
>> > whether the integrations mesh well with current Runner implementations.
>> The
>> > State API is meant to support user state, side inputs, and re-iteration
>> for
>> > large values produced by GroupByKey.
>> >
>> > On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:
>> >
>> > > Yes, I was using a Pipeline that was:
>> > > Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a
>> batch
>> > > pipeline in the global window using the default trigger)
>> > >
>> > > In Google Cloud Dataflow, the shuffle step uses the binary
>> representation
>> > > to compare keys, so the above pipeline would normally be converted to
>> the
>> > > following two stages:
>> > > Read -> GBK Writer
>> > > GBK Reader -> IdentityParDo
>> > >
>> > > Note that the GBK Writer and GBK Reader need to use a coder to encode
>> and
>> > > decode the value.
>> > >
>> > > When using the Fn API, those two stages expanded because of the Fn Api
>> > > crossings using a gRPC Write/Read pair:
>> > > Read -> gRPC Write -> gRPC Read -> GBK Writer
>> > > GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
>> > >
>> > > In my naive prototype implementation, the coder was used to encode
>> > > elements at the gRPC steps. This meant that the coder was
>> > > encoding/decoding/encoding in the first stage and
>> > > decoding/encoding/decoding in the second stage. This tripled the
>> amount
>> > of
>> > > times the coder was being invoked per element. This additional use of
>> the
>> > > coder accounted for ~12% (80% of the 15%) of the extra execution time.
>> > This
>> > > implementation is quite inefficient and would benefit from merging the
>> > gRPC
>> > > Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
>> > into
>> > > another actor allowing for the creation of a fast path that can skip
>> > parts
>> > > of the decode/encode cycle through the coder. By using a byte array
>> view
>> > > over the logical stream, one can minimize the number of byte array
>> copies
>> > > which plagued my naive implementation. This can be done by only
>> parsing
>> > the
>> > > element boundaries out of the stream to produce those logical byte
>> array
>> > > views. I have a very rough estimate that performing this optimization
>> > would
>> > > reduce the 12% overhead to somewhere between 4% and 6%.
>> > >
>> > > The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
>> > > marshalling/unmarshalling protos
>> > > handling/managing the socket
>> > > flow control
>> > > ...
>> > >
>> > > Finally, I did try experiments with different buffer sizes (10KB,
>> 100KB,
>> > > 1000KB), flow control (separate thread[1] vs same thread with
>> phaser[2]),
>> > > and channel type [3] (NIO, epoll, domain socket), but coder overhead
>> > easily
>> > > dominated the differences in these other experiments.
>> > >
>> > > Further analysis would need to be done to more accurately distill this
>> > > down.
>> > >
>> > > 1: https://github.com/lukecwik/incubator-beam/blob/
>> > > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>> rness/stream/
>> > > BufferingStreamObserver.java
>> > > 2: https://github.com/lukecwik/incubator-beam/blob/
>> > > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>> rness/stream/
>> > > DirectStreamObserver.java
>> > > 3: https://github.com/lukecwik/incubator-beam/blob/
>> > >
>> > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/ha
>> rness/channel/
>> > > ManagedChannelFactory.java
>> > >
>> > >
>> > > On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com>
>> wrote:
>> > >
>> > >> Awesome job Lukasz, Excellent, I have to confess the first time I
>> heard
>> > >> about
>> > >> the Fn API idea I was a bit incredulous, but you are making it real,
>> > >> amazing!
>> > >>
>> > >> Just one question from your document, you said that 80% of the extra
>> > (15%)
>> > >> time
>> > >> goes into encoding and decoding the data for your test case, can you
>> > >> expand
>> > >> in
>> > >> your current ideas to improve this? (I am not sure I completely
>> > understand
>> > >> the
>> > >> issue).
>> > >>
>> > >>
>> > >> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik
>> <lc...@google.com.invalid>
>> > >> wrote:
>> > >>
>> > >> > Responded inline.
>> > >> >
>> > >> > On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
>> > >> wrote:
>> > >> >
>> > >> > > This is truly amazing Luke!
>> > >> > >
>> > >> > > If I understand this right, the runner executing the DoFn will
>> > >> delegate
>> > >> > the
>> > >> > > function code and input data (and state, coders, etc.) to the
>> > >> container
>> > >> > > where it will execute with the user's SDK of choice, right ?
>> > >> >
>> > >> >
>> > >> > Yes, that is correct.
>> > >> >
>> > >> >
>> > >> > > I wonder how the containers relate to the underlying engine's
>> worker
>> > >> > > processes ? is it a 1-1, container per worker ? if there's less
>> > "work"
>> > >> > for
>> > >> > > the worker's Java process (for example) now and it becomes a
>> sort of
>> > >> > > "dispatcher", would that change the resource allocation commonly
>> > used
>> > >> for
>> > >> > > the same Pipeline so that the worker processes would require less
>> > >> > > resources, while giving those to the container ?
>> > >> > >
>> > >> >
>> > >> > I think with the four services (control, data, state, logging) you
>> can
>> > >> go
>> > >> > with a 1-1 relationship or break it up more finely grained and
>> > dedicate
>> > >> > some machines to have specific tasks. Like you could have a few
>> > machines
>> > >> > dedicated to log aggregation which all the workers push their logs
>> to.
>> > >> > Similarly, you could have some machines that have a lot of memory
>> > which
>> > >> > would be better to be able to do shuffles in memory and then this
>> > >> cluster
>> > >> > of high memory machines could front the data service. I believe
>> there
>> > >> is a
>> > >> > lot of flexibility based upon what a runner can do and what it
>> > >> specializes
>> > >> > in and believe that with more effort comes more possibilities
>> albeit
>> > >> with
>> > >> > increased internal complexity.
>> > >> >
>> > >> > The layout of resources depends on whether the services and SDK
>> > >> containers
>> > >> > are co-hosted on the same machine or whether there is a different
>> > >> > architecture in play. In a co-hosted configuration, it seems likely
>> > that
>> > >> > the SDK container will get more resources but is dependent on the
>> > runner
>> > >> > and pipeline shape (shuffle heavy dominated pipelines will look
>> > >> different
>> > >> > then ParDo dominated pipelines).
>> > >> >
>> > >> >
>> > >> > > About executing sub-graphs, would it be true to say that as long
>> as
>> > >> > there's
>> > >> > > no shuffle, you could keep executing in the same container ?
>> meaning
>> > >> that
>> > >> > > the graph is broken into sub-graphs by shuffles ?
>> > >> > >
>> > >> >
>> > >> > The only thing that is required is that the Apache Beam model is
>> > >> preserved
>> > >> > so typical break points will be at shuffles and language crossing
>> > points
>> > >> > (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
>> > >> graph
>> > >> > even more for other reasons.
>> > >> >
>> > >> >
>> > >> > > I have to dig-in deeper, so I could have more questions ;-)
>> thanks
>> > >> Luke!
>> > >> > >
>> > >> > > On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
>> > <lcwik@google.com.invalid
>> > >> >
>> > >> > > wrote:
>> > >> > >
>> > >> > > > I updated the PR description to contain the same.
>> > >> > > >
>> > >> > > > I would start by looking at the API/object model definitions
>> found
>> > >> in
>> > >> > > > beam_fn_api.proto
>> > >> > > > <
>> > >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
>> > >> > > sdks/common/fn-api/src/main/proto/beam_fn_api.proto
>> > >> > > > >
>> > >> > > >
>> > >> > > > Then depending on your interest, look at the following:
>> > >> > > > * FnHarness.java
>> > >> > > > <
>> > >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
>> > >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/
>> > >> > harness/FnHarness.java
>> > >> > > > >
>> > >> > > > is the main entry point.
>> > >> > > > * org.apache.beam.fn.harness.data
>> > >> > > > <
>> > >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
>> > >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
>> > >> > > > >
>> > >> > > > contains the most interesting bits of code since it is able to
>> > >> > multiplex
>> > >> > > a
>> > >> > > > gRPC stream into multiple logical streams of elements bound for
>> > >> > multiple
>> > >> > > > concurrent process bundle requests. It also contains the code
>> to
>> > >> take
>> > >> > > > multiple logical outbound streams and multiplex them back onto
>> a
>> > >> gRPC
>> > >> > > > stream.
>> > >> > > > * org.apache.beam.runners.core
>> > >> > > > <
>> > >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
>> > >> > > sdks/java/harness/src/main/java/org/apache/beam/runners/core
>> > >> > > > >
>> > >> > > > contains additional runners akin to the DoFnRunner found in
>> > >> > runners-core
>> > >> > > to
>> > >> > > > support sources and gRPC endpoints.
>> > >> > > >
>> > >> > > > Unless your really interested in how domain sockets, epoll, nio
>> > >> channel
>> > >> > > > factories or how stream readiness callbacks work in gRPC, I
>> would
>> > >> avoid
>> > >> > > the
>> > >> > > > packages org.apache.beam.fn.harness.channel and
>> > >> > > > org.apache.beam.fn.harness.stream. Similarly I would avoid
>> > >> > > > org.apache.beam.fn.harness.fn and
>> org.apache.beam.fn.harness.fake
>> > >> as
>> > >> > > they
>> > >> > > > don't add anything meaningful to the api.
>> > >> > > >
>> > >> > > > Code package descriptions:
>> > >> > > >
>> > >> > > > org.apache.beam.fn.harness.FnHarness: main entry point
>> > >> > > > org.apache.beam.fn.harness.control: Control service client and
>> > >> > > individual
>> > >> > > > request handlers
>> > >> > > > org.apache.beam.fn.harness.data: Data service client and
>> logical
>> > >> > stream
>> > >> > > > multiplexing
>> > >> > > > org.apache.beam.runners.core: Additional runners akin to the
>> > >> DoFnRunner
>> > >> > > > found in runners-core to support sources and gRPC endpoints
>> > >> > > > org.apache.beam.fn.harness.logging: Logging client
>> implementation
>> > >> and
>> > >> > > JUL
>> > >> > > > logging handler adapter
>> > >> > > > org.apache.beam.fn.harness.channel: gRPC channel management
>> > >> > > > org.apache.beam.fn.harness.stream: gRPC stream management
>> > >> > > > org.apache.beam.fn.harness.fn: Java 8 functional interface
>> > >> extensions
>> > >> > > >
>> > >> > > >
>> > >> > > > On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
>> > >> > <klk@google.com.invalid
>> > >> > > >
>> > >> > > > wrote:
>> > >> > > >
>> > >> > > > > This is awesome! Any chance you could roadmap the PR for us
>> with
>> > >> some
>> > >> > > > links
>> > >> > > > > into the most interesting bits?
>> > >> > > > >
>> > >> > > > > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
>> > >> > > > > robertwb@google.com.invalid> wrote:
>> > >> > > > >
>> > >> > > > > > Also, note that we can still support the "simple" case. For
>> > >> > example,
>> > >> > > > > > if the user supplies us with a jar file (as they do now) a
>> > >> runner
>> > >> > > > > > could launch it as a subprocesses and communicate with it
>> via
>> > >> this
>> > >> > > > > > same Fn API or install it in a fixed container itself--the
>> > user
>> > >> > > > > > doesn't *need* to know about docker or manually manage
>> > >> containers
>> > >> > > (and
>> > >> > > > > > indeed the Fn API could be used in-process, cross-process,
>> > >> > > > > > cross-container, and even cross-machine).
>> > >> > > > > >
>> > >> > > > > > However docker provides a nice cross-language way of
>> > specifying
>> > >> the
>> > >> > > > > > environment including all dependencies (especially for
>> > languages
>> > >> > like
>> > >> > > > > > Python or C where the equivalent of a cross-platform,
>> > >> > self-contained
>> > >> > > > > > jar isn't as easy to produce) and is strictly more powerful
>> > and
>> > >> > > > > > flexible (specifically it isolates the runtime environment
>> and
>> > >> one
>> > >> > > can
>> > >> > > > > > even use it for local testing).
>> > >> > > > > >
>> > >> > > > > > Slicing a worker up like this without sacrificing
>> performance
>> > >> is an
>> > >> > > > > > ambitious goal, but essential to the story of being able to
>> > mix
>> > >> and
>> > >> > > > > > match runners and SDKs arbitrarily, and I think this is a
>> > great
>> > >> > > start.
>> > >> > > > > >
>> > >> > > > > >
>> > >> > > > > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
>> > >> > > <lcwik@google.com.invalid
>> > >> > > > >
>> > >> > > > > > wrote:
>> > >> > > > > > > Your correct, a docker container is created that contains
>> > the
>> > >> > > > execution
>> > >> > > > > > > environment the user wants or the user re-uses an
>> existing
>> > one
>> > >> > > > > (allowing
>> > >> > > > > > > for a user to embed all their code/dependencies or use a
>> > >> > container
>> > >> > > > that
>> > >> > > > > > can
>> > >> > > > > > > deploy code/dependencies on demand).
>> > >> > > > > > > A user creates a pipeline saying which docker container
>> they
>> > >> want
>> > >> > > to
>> > >> > > > > use
>> > >> > > > > > > (this starts to allow for multiple container definitions
>> > >> within a
>> > >> > > > > single
>> > >> > > > > > > pipeline to support multiple languages, versioning, ...).
>> > >> > > > > > > A runner would then be responsible for launching one or
>> more
>> > >> of
>> > >> > > these
>> > >> > > > > > > containers in a cluster manager of their choice (scaling
>> up
>> > or
>> > >> > down
>> > >> > > > the
>> > >> > > > > > > number of instances depending on demand/load/...).
>> > >> > > > > > > A runner then interacts with the docker containers over
>> the
>> > >> gRPC
>> > >> > > > > service
>> > >> > > > > > > definitions to delegate processing to.
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
>> > >> > > > jb@nanthrax.net
>> > >> > > > > >
>> > >> > > > > > > wrote:
>> > >> > > > > > >
>> > >> > > > > > >> Hi Luke,
>> > >> > > > > > >>
>> > >> > > > > > >> that's really great and very promising !
>> > >> > > > > > >>
>> > >> > > > > > >> It's really ambitious but I like the idea. Just to
>> clarify:
>> > >> the
>> > >> > > > > purpose
>> > >> > > > > > of
>> > >> > > > > > >> using gRPC is once the docker container is running,
>> then we
>> > >> can
>> > >> > > > > > "interact"
>> > >> > > > > > >> with the container to spread and delegate processing to
>> the
>> > >> > docker
>> > >> > > > > > >> container, correct ?
>> > >> > > > > > >> The users/devops have to setup the docker containers as
>> > >> > > > prerequisite.
>> > >> > > > > > >> Then, the "location" of the containers (kind of
>> container
>> > >> > > registry)
>> > >> > > > is
>> > >> > > > > > set
>> > >> > > > > > >> via the pipeline options and used by gRPC ?
>> > >> > > > > > >>
>> > >> > > > > > >> Thanks Luke !
>> > >> > > > > > >>
>> > >> > > > > > >> Regards
>> > >> > > > > > >> JB
>> > >> > > > > > >>
>> > >> > > > > > >>
>> > >> > > > > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
>> > >> > > > > > >>
>> > >> > > > > > >>> I have been prototyping several components towards the
>> > Beam
>> > >> > > > technical
>> > >> > > > > > >>> vision of being able to execute an arbitrary language
>> > using
>> > >> an
>> > >> > > > > > arbitrary
>> > >> > > > > > >>> runner.
>> > >> > > > > > >>>
>> > >> > > > > > >>> I would like to share this overview [1] of what I have
>> > been
>> > >> > > working
>> > >> > > > > > >>> towards. I also share this PR [2] with a proposed API,
>> > >> service
>> > >> > > > > > definitions
>> > >> > > > > > >>> and partial implementation.
>> > >> > > > > > >>>
>> > >> > > > > > >>> 1: https://s.apache.org/beam-fn-api
>> > >> > > > > > >>> 2: https://github.com/apache/beam/pull/1801
>> > >> > > > > > >>>
>> > >> > > > > > >>> Please comment on the overview within this thread, and
>> any
>> > >> > > specific
>> > >> > > > > > code
>> > >> > > > > > >>> comments on the PR directly.
>> > >> > > > > > >>>
>> > >> > > > > > >>> Luke
>> > >> > > > > > >>>
>> > >> > > > > > >>>
>> > >> > > > > > >> --
>> > >> > > > > > >> Jean-Baptiste Onofré
>> > >> > > > > > >> jbonofre@apache.org
>> > >> > > > > > >> http://blog.nanthrax.net
>> > >> > > > > > >> Talend - http://www.talend.com
>> > >> > > > > > >>
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Re: Beam Fn API

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
Manu, the goal is to share here initially, update the docs addressing
people's comments, and then publish them on the website once they are
stable enough.

On Sun, May 21, 2017 at 5:54 PM, Manu Zhang <ow...@gmail.com> wrote:

> Thanks Lukasz. The following two links were somehow incorrectly formatted
> in your mail.
>
> * How to process a bundle:
> https://s.apache.org/beam-fn-api-processing-a-bundle
> * How to send and receive data:
> https://s.apache.org/beam-fn-api-send-and-receive-data
>
> By the way, is there a way to find them from the Beam website ?
>
>
> On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> > Now that I'm back from vacation and the 2.0.0 release is not taking all
> my
> > time, I am focusing my attention on working on the Beam Portability
> > framework, specifically the Fn API so that we can get Python and other
> > language integrations work with any runner.
> >
> > For new comers, I would like to reshare the overview:
> > https://s.apache.org/beam-fn-api
> >
> > And for those of you who have been following this thread and contributors
> > focusing on Runner integration with Apache Beam:
> > * How to process a bundle: https://s.apache.org/beam-fn-
> api-processing-a-
> > bundle
> > * How to send and receive data: https://s.apache.org/
> > beam-fn-api-send-and-receive-data
> >
> > If you want to dive deeper, you should look at:
> > * Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
> > common/runner-api/src/main/proto/beam_runner_api.proto
> > <https://github.com/apache/beam/blob/master/sdks/common/
> runner-api/src/main/proto/beam_runner_api.proto>
> > * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
> > common/fn-api/src/main/proto/beam_fn_api.proto
> > <https://github.com/apache/beam/blob/master/sdks/common/
> fn-api/src/main/proto/beam_fn_api.proto>
> > * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
> > java/harness
> > <https://github.com/apache/beam/tree/master/sdks/java/harness>
> > * Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
> > python/apache_beam/runners/worker
> > <https://github.com/apache/beam/tree/master/sdks/python/
> apache_beam/runners/worker>
> >
> > Next I'm planning on talking about Beam Fn State API and will need help
> > from Runner contributors to talk about caching semantics and key spaces
> and
> > whether the integrations mesh well with current Runner implementations.
> The
> > State API is meant to support user state, side inputs, and re-iteration
> for
> > large values produced by GroupByKey.
> >
> > On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:
> >
> > > Yes, I was using a Pipeline that was:
> > > Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a
> batch
> > > pipeline in the global window using the default trigger)
> > >
> > > In Google Cloud Dataflow, the shuffle step uses the binary
> representation
> > > to compare keys, so the above pipeline would normally be converted to
> the
> > > following two stages:
> > > Read -> GBK Writer
> > > GBK Reader -> IdentityParDo
> > >
> > > Note that the GBK Writer and GBK Reader need to use a coder to encode
> and
> > > decode the value.
> > >
> > > When using the Fn API, those two stages expanded because of the Fn Api
> > > crossings using a gRPC Write/Read pair:
> > > Read -> gRPC Write -> gRPC Read -> GBK Writer
> > > GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
> > >
> > > In my naive prototype implementation, the coder was used to encode
> > > elements at the gRPC steps. This meant that the coder was
> > > encoding/decoding/encoding in the first stage and
> > > decoding/encoding/decoding in the second stage. This tripled the amount
> > of
> > > times the coder was being invoked per element. This additional use of
> the
> > > coder accounted for ~12% (80% of the 15%) of the extra execution time.
> > This
> > > implementation is quite inefficient and would benefit from merging the
> > gRPC
> > > Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
> > into
> > > another actor allowing for the creation of a fast path that can skip
> > parts
> > > of the decode/encode cycle through the coder. By using a byte array
> view
> > > over the logical stream, one can minimize the number of byte array
> copies
> > > which plagued my naive implementation. This can be done by only parsing
> > the
> > > element boundaries out of the stream to produce those logical byte
> array
> > > views. I have a very rough estimate that performing this optimization
> > would
> > > reduce the 12% overhead to somewhere between 4% and 6%.
> > >
> > > The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
> > > marshalling/unmarshalling protos
> > > handling/managing the socket
> > > flow control
> > > ...
> > >
> > > Finally, I did try experiments with different buffer sizes (10KB,
> 100KB,
> > > 1000KB), flow control (separate thread[1] vs same thread with
> phaser[2]),
> > > and channel type [3] (NIO, epoll, domain socket), but coder overhead
> > easily
> > > dominated the differences in these other experiments.
> > >
> > > Further analysis would need to be done to more accurately distill this
> > > down.
> > >
> > > 1: https://github.com/lukecwik/incubator-beam/blob/
> > > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/
> harness/stream/
> > > BufferingStreamObserver.java
> > > 2: https://github.com/lukecwik/incubator-beam/blob/
> > > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/
> harness/stream/
> > > DirectStreamObserver.java
> > > 3: https://github.com/lukecwik/incubator-beam/blob/
> > >
> > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/
> harness/channel/
> > > ManagedChannelFactory.java
> > >
> > >
> > > On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com>
> wrote:
> > >
> > >> Awesome job Lukasz, Excellent, I have to confess the first time I
> heard
> > >> about
> > >> the Fn API idea I was a bit incredulous, but you are making it real,
> > >> amazing!
> > >>
> > >> Just one question from your document, you said that 80% of the extra
> > (15%)
> > >> time
> > >> goes into encoding and decoding the data for your test case, can you
> > >> expand
> > >> in
> > >> your current ideas to improve this? (I am not sure I completely
> > understand
> > >> the
> > >> issue).
> > >>
> > >>
> > >> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik <lcwik@google.com.invalid
> >
> > >> wrote:
> > >>
> > >> > Responded inline.
> > >> >
> > >> > On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
> > >> wrote:
> > >> >
> > >> > > This is truly amazing Luke!
> > >> > >
> > >> > > If I understand this right, the runner executing the DoFn will
> > >> delegate
> > >> > the
> > >> > > function code and input data (and state, coders, etc.) to the
> > >> container
> > >> > > where it will execute with the user's SDK of choice, right ?
> > >> >
> > >> >
> > >> > Yes, that is correct.
> > >> >
> > >> >
> > >> > > I wonder how the containers relate to the underlying engine's
> worker
> > >> > > processes ? is it a 1-1, container per worker ? if there's less
> > "work"
> > >> > for
> > >> > > the worker's Java process (for example) now and it becomes a sort
> of
> > >> > > "dispatcher", would that change the resource allocation commonly
> > used
> > >> for
> > >> > > the same Pipeline so that the worker processes would require less
> > >> > > resources, while giving those to the container ?
> > >> > >
> > >> >
> > >> > I think with the four services (control, data, state, logging) you
> can
> > >> go
> > >> > with a 1-1 relationship or break it up more finely grained and
> > dedicate
> > >> > some machines to have specific tasks. Like you could have a few
> > machines
> > >> > dedicated to log aggregation which all the workers push their logs
> to.
> > >> > Similarly, you could have some machines that have a lot of memory
> > which
> > >> > would be better to be able to do shuffles in memory and then this
> > >> cluster
> > >> > of high memory machines could front the data service. I believe
> there
> > >> is a
> > >> > lot of flexibility based upon what a runner can do and what it
> > >> specializes
> > >> > in and believe that with more effort comes more possibilities albeit
> > >> with
> > >> > increased internal complexity.
> > >> >
> > >> > The layout of resources depends on whether the services and SDK
> > >> containers
> > >> > are co-hosted on the same machine or whether there is a different
> > >> > architecture in play. In a co-hosted configuration, it seems likely
> > that
> > >> > the SDK container will get more resources but is dependent on the
> > runner
> > >> > and pipeline shape (shuffle heavy dominated pipelines will look
> > >> different
> > >> > then ParDo dominated pipelines).
> > >> >
> > >> >
> > >> > > About executing sub-graphs, would it be true to say that as long
> as
> > >> > there's
> > >> > > no shuffle, you could keep executing in the same container ?
> meaning
> > >> that
> > >> > > the graph is broken into sub-graphs by shuffles ?
> > >> > >
> > >> >
> > >> > The only thing that is required is that the Apache Beam model is
> > >> preserved
> > >> > so typical break points will be at shuffles and language crossing
> > points
> > >> > (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
> > >> graph
> > >> > even more for other reasons.
> > >> >
> > >> >
> > >> > > I have to dig-in deeper, so I could have more questions ;-) thanks
> > >> Luke!
> > >> > >
> > >> > > On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
> > <lcwik@google.com.invalid
> > >> >
> > >> > > wrote:
> > >> > >
> > >> > > > I updated the PR description to contain the same.
> > >> > > >
> > >> > > > I would start by looking at the API/object model definitions
> found
> > >> in
> > >> > > > beam_fn_api.proto
> > >> > > > <
> > >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> > >> > > sdks/common/fn-api/src/main/proto/beam_fn_api.proto
> > >> > > > >
> > >> > > >
> > >> > > > Then depending on your interest, look at the following:
> > >> > > > * FnHarness.java
> > >> > > > <
> > >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> > >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/
> > >> > harness/FnHarness.java
> > >> > > > >
> > >> > > > is the main entry point.
> > >> > > > * org.apache.beam.fn.harness.data
> > >> > > > <
> > >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> > >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
> > >> > > > >
> > >> > > > contains the most interesting bits of code since it is able to
> > >> > multiplex
> > >> > > a
> > >> > > > gRPC stream into multiple logical streams of elements bound for
> > >> > multiple
> > >> > > > concurrent process bundle requests. It also contains the code to
> > >> take
> > >> > > > multiple logical outbound streams and multiplex them back onto a
> > >> gRPC
> > >> > > > stream.
> > >> > > > * org.apache.beam.runners.core
> > >> > > > <
> > >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> > >> > > sdks/java/harness/src/main/java/org/apache/beam/runners/core
> > >> > > > >
> > >> > > > contains additional runners akin to the DoFnRunner found in
> > >> > runners-core
> > >> > > to
> > >> > > > support sources and gRPC endpoints.
> > >> > > >
> > >> > > > Unless your really interested in how domain sockets, epoll, nio
> > >> channel
> > >> > > > factories or how stream readiness callbacks work in gRPC, I
> would
> > >> avoid
> > >> > > the
> > >> > > > packages org.apache.beam.fn.harness.channel and
> > >> > > > org.apache.beam.fn.harness.stream. Similarly I would avoid
> > >> > > > org.apache.beam.fn.harness.fn and org.apache.beam.fn.harness.
> fake
> > >> as
> > >> > > they
> > >> > > > don't add anything meaningful to the api.
> > >> > > >
> > >> > > > Code package descriptions:
> > >> > > >
> > >> > > > org.apache.beam.fn.harness.FnHarness: main entry point
> > >> > > > org.apache.beam.fn.harness.control: Control service client and
> > >> > > individual
> > >> > > > request handlers
> > >> > > > org.apache.beam.fn.harness.data: Data service client and
> logical
> > >> > stream
> > >> > > > multiplexing
> > >> > > > org.apache.beam.runners.core: Additional runners akin to the
> > >> DoFnRunner
> > >> > > > found in runners-core to support sources and gRPC endpoints
> > >> > > > org.apache.beam.fn.harness.logging: Logging client
> implementation
> > >> and
> > >> > > JUL
> > >> > > > logging handler adapter
> > >> > > > org.apache.beam.fn.harness.channel: gRPC channel management
> > >> > > > org.apache.beam.fn.harness.stream: gRPC stream management
> > >> > > > org.apache.beam.fn.harness.fn: Java 8 functional interface
> > >> extensions
> > >> > > >
> > >> > > >
> > >> > > > On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
> > >> > <klk@google.com.invalid
> > >> > > >
> > >> > > > wrote:
> > >> > > >
> > >> > > > > This is awesome! Any chance you could roadmap the PR for us
> with
> > >> some
> > >> > > > links
> > >> > > > > into the most interesting bits?
> > >> > > > >
> > >> > > > > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
> > >> > > > > robertwb@google.com.invalid> wrote:
> > >> > > > >
> > >> > > > > > Also, note that we can still support the "simple" case. For
> > >> > example,
> > >> > > > > > if the user supplies us with a jar file (as they do now) a
> > >> runner
> > >> > > > > > could launch it as a subprocesses and communicate with it
> via
> > >> this
> > >> > > > > > same Fn API or install it in a fixed container itself--the
> > user
> > >> > > > > > doesn't *need* to know about docker or manually manage
> > >> containers
> > >> > > (and
> > >> > > > > > indeed the Fn API could be used in-process, cross-process,
> > >> > > > > > cross-container, and even cross-machine).
> > >> > > > > >
> > >> > > > > > However docker provides a nice cross-language way of
> > specifying
> > >> the
> > >> > > > > > environment including all dependencies (especially for
> > languages
> > >> > like
> > >> > > > > > Python or C where the equivalent of a cross-platform,
> > >> > self-contained
> > >> > > > > > jar isn't as easy to produce) and is strictly more powerful
> > and
> > >> > > > > > flexible (specifically it isolates the runtime environment
> and
> > >> one
> > >> > > can
> > >> > > > > > even use it for local testing).
> > >> > > > > >
> > >> > > > > > Slicing a worker up like this without sacrificing
> performance
> > >> is an
> > >> > > > > > ambitious goal, but essential to the story of being able to
> > mix
> > >> and
> > >> > > > > > match runners and SDKs arbitrarily, and I think this is a
> > great
> > >> > > start.
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
> > >> > > <lcwik@google.com.invalid
> > >> > > > >
> > >> > > > > > wrote:
> > >> > > > > > > Your correct, a docker container is created that contains
> > the
> > >> > > > execution
> > >> > > > > > > environment the user wants or the user re-uses an existing
> > one
> > >> > > > > (allowing
> > >> > > > > > > for a user to embed all their code/dependencies or use a
> > >> > container
> > >> > > > that
> > >> > > > > > can
> > >> > > > > > > deploy code/dependencies on demand).
> > >> > > > > > > A user creates a pipeline saying which docker container
> they
> > >> want
> > >> > > to
> > >> > > > > use
> > >> > > > > > > (this starts to allow for multiple container definitions
> > >> within a
> > >> > > > > single
> > >> > > > > > > pipeline to support multiple languages, versioning, ...).
> > >> > > > > > > A runner would then be responsible for launching one or
> more
> > >> of
> > >> > > these
> > >> > > > > > > containers in a cluster manager of their choice (scaling
> up
> > or
> > >> > down
> > >> > > > the
> > >> > > > > > > number of instances depending on demand/load/...).
> > >> > > > > > > A runner then interacts with the docker containers over
> the
> > >> gRPC
> > >> > > > > service
> > >> > > > > > > definitions to delegate processing to.
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
> > >> > > > jb@nanthrax.net
> > >> > > > > >
> > >> > > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > >> Hi Luke,
> > >> > > > > > >>
> > >> > > > > > >> that's really great and very promising !
> > >> > > > > > >>
> > >> > > > > > >> It's really ambitious but I like the idea. Just to
> clarify:
> > >> the
> > >> > > > > purpose
> > >> > > > > > of
> > >> > > > > > >> using gRPC is once the docker container is running, then
> we
> > >> can
> > >> > > > > > "interact"
> > >> > > > > > >> with the container to spread and delegate processing to
> the
> > >> > docker
> > >> > > > > > >> container, correct ?
> > >> > > > > > >> The users/devops have to setup the docker containers as
> > >> > > > prerequisite.
> > >> > > > > > >> Then, the "location" of the containers (kind of container
> > >> > > registry)
> > >> > > > is
> > >> > > > > > set
> > >> > > > > > >> via the pipeline options and used by gRPC ?
> > >> > > > > > >>
> > >> > > > > > >> Thanks Luke !
> > >> > > > > > >>
> > >> > > > > > >> Regards
> > >> > > > > > >> JB
> > >> > > > > > >>
> > >> > > > > > >>
> > >> > > > > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
> > >> > > > > > >>
> > >> > > > > > >>> I have been prototyping several components towards the
> > Beam
> > >> > > > technical
> > >> > > > > > >>> vision of being able to execute an arbitrary language
> > using
> > >> an
> > >> > > > > > arbitrary
> > >> > > > > > >>> runner.
> > >> > > > > > >>>
> > >> > > > > > >>> I would like to share this overview [1] of what I have
> > been
> > >> > > working
> > >> > > > > > >>> towards. I also share this PR [2] with a proposed API,
> > >> service
> > >> > > > > > definitions
> > >> > > > > > >>> and partial implementation.
> > >> > > > > > >>>
> > >> > > > > > >>> 1: https://s.apache.org/beam-fn-api
> > >> > > > > > >>> 2: https://github.com/apache/beam/pull/1801
> > >> > > > > > >>>
> > >> > > > > > >>> Please comment on the overview within this thread, and
> any
> > >> > > specific
> > >> > > > > > code
> > >> > > > > > >>> comments on the PR directly.
> > >> > > > > > >>>
> > >> > > > > > >>> Luke
> > >> > > > > > >>>
> > >> > > > > > >>>
> > >> > > > > > >> --
> > >> > > > > > >> Jean-Baptiste Onofré
> > >> > > > > > >> jbonofre@apache.org
> > >> > > > > > >> http://blog.nanthrax.net
> > >> > > > > > >> Talend - http://www.talend.com
> > >> > > > > > >>
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Re: Beam Fn API

Posted by Manu Zhang <ow...@gmail.com>.
Thanks Lukasz. The following two links were somehow incorrectly formatted
in your mail.

* How to process a bundle:
https://s.apache.org/beam-fn-api-processing-a-bundle
* How to send and receive data:
https://s.apache.org/beam-fn-api-send-and-receive-data

By the way, is there a way to find them from the Beam website ?


On Fri, May 19, 2017 at 6:44 AM Lukasz Cwik <lc...@google.com.invalid>
wrote:

> Now that I'm back from vacation and the 2.0.0 release is not taking all my
> time, I am focusing my attention on working on the Beam Portability
> framework, specifically the Fn API so that we can get Python and other
> language integrations work with any runner.
>
> For new comers, I would like to reshare the overview:
> https://s.apache.org/beam-fn-api
>
> And for those of you who have been following this thread and contributors
> focusing on Runner integration with Apache Beam:
> * How to process a bundle: https://s.apache.org/beam-fn-api-processing-a-
> bundle
> * How to send and receive data: https://s.apache.org/
> beam-fn-api-send-and-receive-data
>
> If you want to dive deeper, you should look at:
> * Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
> common/runner-api/src/main/proto/beam_runner_api.proto
> <https://github.com/apache/beam/blob/master/sdks/common/runner-api/src/main/proto/beam_runner_api.proto>
> * Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
> common/fn-api/src/main/proto/beam_fn_api.proto
> <https://github.com/apache/beam/blob/master/sdks/common/fn-api/src/main/proto/beam_fn_api.proto>
> * Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
> java/harness
> <https://github.com/apache/beam/tree/master/sdks/java/harness>
> * Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
> python/apache_beam/runners/worker
> <https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/worker>
>
> Next I'm planning on talking about Beam Fn State API and will need help
> from Runner contributors to talk about caching semantics and key spaces and
> whether the integrations mesh well with current Runner implementations. The
> State API is meant to support user state, side inputs, and re-iteration for
> large values produced by GroupByKey.
>
> On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:
>
> > Yes, I was using a Pipeline that was:
> > Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a batch
> > pipeline in the global window using the default trigger)
> >
> > In Google Cloud Dataflow, the shuffle step uses the binary representation
> > to compare keys, so the above pipeline would normally be converted to the
> > following two stages:
> > Read -> GBK Writer
> > GBK Reader -> IdentityParDo
> >
> > Note that the GBK Writer and GBK Reader need to use a coder to encode and
> > decode the value.
> >
> > When using the Fn API, those two stages expanded because of the Fn Api
> > crossings using a gRPC Write/Read pair:
> > Read -> gRPC Write -> gRPC Read -> GBK Writer
> > GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
> >
> > In my naive prototype implementation, the coder was used to encode
> > elements at the gRPC steps. This meant that the coder was
> > encoding/decoding/encoding in the first stage and
> > decoding/encoding/decoding in the second stage. This tripled the amount
> of
> > times the coder was being invoked per element. This additional use of the
> > coder accounted for ~12% (80% of the 15%) of the extra execution time.
> This
> > implementation is quite inefficient and would benefit from merging the
> gRPC
> > Read + GBK Writer into one actor and also the GBK Reader + gRPC Write
> into
> > another actor allowing for the creation of a fast path that can skip
> parts
> > of the decode/encode cycle through the coder. By using a byte array view
> > over the logical stream, one can minimize the number of byte array copies
> > which plagued my naive implementation. This can be done by only parsing
> the
> > element boundaries out of the stream to produce those logical byte array
> > views. I have a very rough estimate that performing this optimization
> would
> > reduce the 12% overhead to somewhere between 4% and 6%.
> >
> > The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
> > marshalling/unmarshalling protos
> > handling/managing the socket
> > flow control
> > ...
> >
> > Finally, I did try experiments with different buffer sizes (10KB, 100KB,
> > 1000KB), flow control (separate thread[1] vs same thread with phaser[2]),
> > and channel type [3] (NIO, epoll, domain socket), but coder overhead
> easily
> > dominated the differences in these other experiments.
> >
> > Further analysis would need to be done to more accurately distill this
> > down.
> >
> > 1: https://github.com/lukecwik/incubator-beam/blob/
> > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/
> > BufferingStreamObserver.java
> > 2: https://github.com/lukecwik/incubator-beam/blob/
> > fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/
> > DirectStreamObserver.java
> > 3: https://github.com/lukecwik/incubator-beam/blob/
> >
> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/
> > ManagedChannelFactory.java
> >
> >
> > On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com> wrote:
> >
> >> Awesome job Lukasz, Excellent, I have to confess the first time I heard
> >> about
> >> the Fn API idea I was a bit incredulous, but you are making it real,
> >> amazing!
> >>
> >> Just one question from your document, you said that 80% of the extra
> (15%)
> >> time
> >> goes into encoding and decoding the data for your test case, can you
> >> expand
> >> in
> >> your current ideas to improve this? (I am not sure I completely
> understand
> >> the
> >> issue).
> >>
> >>
> >> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik <lc...@google.com.invalid>
> >> wrote:
> >>
> >> > Responded inline.
> >> >
> >> > On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
> >> wrote:
> >> >
> >> > > This is truly amazing Luke!
> >> > >
> >> > > If I understand this right, the runner executing the DoFn will
> >> delegate
> >> > the
> >> > > function code and input data (and state, coders, etc.) to the
> >> container
> >> > > where it will execute with the user's SDK of choice, right ?
> >> >
> >> >
> >> > Yes, that is correct.
> >> >
> >> >
> >> > > I wonder how the containers relate to the underlying engine's worker
> >> > > processes ? is it a 1-1, container per worker ? if there's less
> "work"
> >> > for
> >> > > the worker's Java process (for example) now and it becomes a sort of
> >> > > "dispatcher", would that change the resource allocation commonly
> used
> >> for
> >> > > the same Pipeline so that the worker processes would require less
> >> > > resources, while giving those to the container ?
> >> > >
> >> >
> >> > I think with the four services (control, data, state, logging) you can
> >> go
> >> > with a 1-1 relationship or break it up more finely grained and
> dedicate
> >> > some machines to have specific tasks. Like you could have a few
> machines
> >> > dedicated to log aggregation which all the workers push their logs to.
> >> > Similarly, you could have some machines that have a lot of memory
> which
> >> > would be better to be able to do shuffles in memory and then this
> >> cluster
> >> > of high memory machines could front the data service. I believe there
> >> is a
> >> > lot of flexibility based upon what a runner can do and what it
> >> specializes
> >> > in and believe that with more effort comes more possibilities albeit
> >> with
> >> > increased internal complexity.
> >> >
> >> > The layout of resources depends on whether the services and SDK
> >> containers
> >> > are co-hosted on the same machine or whether there is a different
> >> > architecture in play. In a co-hosted configuration, it seems likely
> that
> >> > the SDK container will get more resources but is dependent on the
> runner
> >> > and pipeline shape (shuffle heavy dominated pipelines will look
> >> different
> >> > then ParDo dominated pipelines).
> >> >
> >> >
> >> > > About executing sub-graphs, would it be true to say that as long as
> >> > there's
> >> > > no shuffle, you could keep executing in the same container ? meaning
> >> that
> >> > > the graph is broken into sub-graphs by shuffles ?
> >> > >
> >> >
> >> > The only thing that is required is that the Apache Beam model is
> >> preserved
> >> > so typical break points will be at shuffles and language crossing
> points
> >> > (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
> >> graph
> >> > even more for other reasons.
> >> >
> >> >
> >> > > I have to dig-in deeper, so I could have more questions ;-) thanks
> >> Luke!
> >> > >
> >> > > On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik
> <lcwik@google.com.invalid
> >> >
> >> > > wrote:
> >> > >
> >> > > > I updated the PR description to contain the same.
> >> > > >
> >> > > > I would start by looking at the API/object model definitions found
> >> in
> >> > > > beam_fn_api.proto
> >> > > > <
> >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> >> > > sdks/common/fn-api/src/main/proto/beam_fn_api.proto
> >> > > > >
> >> > > >
> >> > > > Then depending on your interest, look at the following:
> >> > > > * FnHarness.java
> >> > > > <
> >> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/
> >> > harness/FnHarness.java
> >> > > > >
> >> > > > is the main entry point.
> >> > > > * org.apache.beam.fn.harness.data
> >> > > > <
> >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> >> > > sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
> >> > > > >
> >> > > > contains the most interesting bits of code since it is able to
> >> > multiplex
> >> > > a
> >> > > > gRPC stream into multiple logical streams of elements bound for
> >> > multiple
> >> > > > concurrent process bundle requests. It also contains the code to
> >> take
> >> > > > multiple logical outbound streams and multiplex them back onto a
> >> gRPC
> >> > > > stream.
> >> > > > * org.apache.beam.runners.core
> >> > > > <
> >> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> >> > > sdks/java/harness/src/main/java/org/apache/beam/runners/core
> >> > > > >
> >> > > > contains additional runners akin to the DoFnRunner found in
> >> > runners-core
> >> > > to
> >> > > > support sources and gRPC endpoints.
> >> > > >
> >> > > > Unless your really interested in how domain sockets, epoll, nio
> >> channel
> >> > > > factories or how stream readiness callbacks work in gRPC, I would
> >> avoid
> >> > > the
> >> > > > packages org.apache.beam.fn.harness.channel and
> >> > > > org.apache.beam.fn.harness.stream. Similarly I would avoid
> >> > > > org.apache.beam.fn.harness.fn and org.apache.beam.fn.harness.fake
> >> as
> >> > > they
> >> > > > don't add anything meaningful to the api.
> >> > > >
> >> > > > Code package descriptions:
> >> > > >
> >> > > > org.apache.beam.fn.harness.FnHarness: main entry point
> >> > > > org.apache.beam.fn.harness.control: Control service client and
> >> > > individual
> >> > > > request handlers
> >> > > > org.apache.beam.fn.harness.data: Data service client and logical
> >> > stream
> >> > > > multiplexing
> >> > > > org.apache.beam.runners.core: Additional runners akin to the
> >> DoFnRunner
> >> > > > found in runners-core to support sources and gRPC endpoints
> >> > > > org.apache.beam.fn.harness.logging: Logging client implementation
> >> and
> >> > > JUL
> >> > > > logging handler adapter
> >> > > > org.apache.beam.fn.harness.channel: gRPC channel management
> >> > > > org.apache.beam.fn.harness.stream: gRPC stream management
> >> > > > org.apache.beam.fn.harness.fn: Java 8 functional interface
> >> extensions
> >> > > >
> >> > > >
> >> > > > On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
> >> > <klk@google.com.invalid
> >> > > >
> >> > > > wrote:
> >> > > >
> >> > > > > This is awesome! Any chance you could roadmap the PR for us with
> >> some
> >> > > > links
> >> > > > > into the most interesting bits?
> >> > > > >
> >> > > > > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
> >> > > > > robertwb@google.com.invalid> wrote:
> >> > > > >
> >> > > > > > Also, note that we can still support the "simple" case. For
> >> > example,
> >> > > > > > if the user supplies us with a jar file (as they do now) a
> >> runner
> >> > > > > > could launch it as a subprocesses and communicate with it via
> >> this
> >> > > > > > same Fn API or install it in a fixed container itself--the
> user
> >> > > > > > doesn't *need* to know about docker or manually manage
> >> containers
> >> > > (and
> >> > > > > > indeed the Fn API could be used in-process, cross-process,
> >> > > > > > cross-container, and even cross-machine).
> >> > > > > >
> >> > > > > > However docker provides a nice cross-language way of
> specifying
> >> the
> >> > > > > > environment including all dependencies (especially for
> languages
> >> > like
> >> > > > > > Python or C where the equivalent of a cross-platform,
> >> > self-contained
> >> > > > > > jar isn't as easy to produce) and is strictly more powerful
> and
> >> > > > > > flexible (specifically it isolates the runtime environment and
> >> one
> >> > > can
> >> > > > > > even use it for local testing).
> >> > > > > >
> >> > > > > > Slicing a worker up like this without sacrificing performance
> >> is an
> >> > > > > > ambitious goal, but essential to the story of being able to
> mix
> >> and
> >> > > > > > match runners and SDKs arbitrarily, and I think this is a
> great
> >> > > start.
> >> > > > > >
> >> > > > > >
> >> > > > > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
> >> > > <lcwik@google.com.invalid
> >> > > > >
> >> > > > > > wrote:
> >> > > > > > > Your correct, a docker container is created that contains
> the
> >> > > > execution
> >> > > > > > > environment the user wants or the user re-uses an existing
> one
> >> > > > > (allowing
> >> > > > > > > for a user to embed all their code/dependencies or use a
> >> > container
> >> > > > that
> >> > > > > > can
> >> > > > > > > deploy code/dependencies on demand).
> >> > > > > > > A user creates a pipeline saying which docker container they
> >> want
> >> > > to
> >> > > > > use
> >> > > > > > > (this starts to allow for multiple container definitions
> >> within a
> >> > > > > single
> >> > > > > > > pipeline to support multiple languages, versioning, ...).
> >> > > > > > > A runner would then be responsible for launching one or more
> >> of
> >> > > these
> >> > > > > > > containers in a cluster manager of their choice (scaling up
> or
> >> > down
> >> > > > the
> >> > > > > > > number of instances depending on demand/load/...).
> >> > > > > > > A runner then interacts with the docker containers over the
> >> gRPC
> >> > > > > service
> >> > > > > > > definitions to delegate processing to.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
> >> > > > jb@nanthrax.net
> >> > > > > >
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > >> Hi Luke,
> >> > > > > > >>
> >> > > > > > >> that's really great and very promising !
> >> > > > > > >>
> >> > > > > > >> It's really ambitious but I like the idea. Just to clarify:
> >> the
> >> > > > > purpose
> >> > > > > > of
> >> > > > > > >> using gRPC is once the docker container is running, then we
> >> can
> >> > > > > > "interact"
> >> > > > > > >> with the container to spread and delegate processing to the
> >> > docker
> >> > > > > > >> container, correct ?
> >> > > > > > >> The users/devops have to setup the docker containers as
> >> > > > prerequisite.
> >> > > > > > >> Then, the "location" of the containers (kind of container
> >> > > registry)
> >> > > > is
> >> > > > > > set
> >> > > > > > >> via the pipeline options and used by gRPC ?
> >> > > > > > >>
> >> > > > > > >> Thanks Luke !
> >> > > > > > >>
> >> > > > > > >> Regards
> >> > > > > > >> JB
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
> >> > > > > > >>
> >> > > > > > >>> I have been prototyping several components towards the
> Beam
> >> > > > technical
> >> > > > > > >>> vision of being able to execute an arbitrary language
> using
> >> an
> >> > > > > > arbitrary
> >> > > > > > >>> runner.
> >> > > > > > >>>
> >> > > > > > >>> I would like to share this overview [1] of what I have
> been
> >> > > working
> >> > > > > > >>> towards. I also share this PR [2] with a proposed API,
> >> service
> >> > > > > > definitions
> >> > > > > > >>> and partial implementation.
> >> > > > > > >>>
> >> > > > > > >>> 1: https://s.apache.org/beam-fn-api
> >> > > > > > >>> 2: https://github.com/apache/beam/pull/1801
> >> > > > > > >>>
> >> > > > > > >>> Please comment on the overview within this thread, and any
> >> > > specific
> >> > > > > > code
> >> > > > > > >>> comments on the PR directly.
> >> > > > > > >>>
> >> > > > > > >>> Luke
> >> > > > > > >>>
> >> > > > > > >>>
> >> > > > > > >> --
> >> > > > > > >> Jean-Baptiste Onofré
> >> > > > > > >> jbonofre@apache.org
> >> > > > > > >> http://blog.nanthrax.net
> >> > > > > > >> Talend - http://www.talend.com
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Re: Beam Fn API

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
Now that I'm back from vacation and the 2.0.0 release is not taking all my
time, I am focusing my attention on working on the Beam Portability
framework, specifically the Fn API so that we can get Python and other
language integrations work with any runner.

For new comers, I would like to reshare the overview:
https://s.apache.org/beam-fn-api

And for those of you who have been following this thread and contributors
focusing on Runner integration with Apache Beam:
* How to process a bundle: https://s.apache.org/beam-fn-api-processing-a-
bundle
* How to send and receive data: https://s.apache.org/
beam-fn-api-send-and-receive-data

If you want to dive deeper, you should look at:
* Runner API Protobuf: https://github.com/apache/beam/blob/master/sdks/
common/runner-api/src/main/proto/beam_runner_api.proto
* Fn API Protobuf: https://github.com/apache/beam/blob/master/sdks/
common/fn-api/src/main/proto/beam_fn_api.proto
* Java SDK Harness: https://github.com/apache/beam/tree/master/sdks/
java/harness
* Python SDK Harness: https://github.com/apache/beam/tree/master/sdks/
python/apache_beam/runners/worker

Next I'm planning on talking about Beam Fn State API and will need help
from Runner contributors to talk about caching semantics and key spaces and
whether the integrations mesh well with current Runner implementations. The
State API is meant to support user state, side inputs, and re-iteration for
large values produced by GroupByKey.

On Tue, Jan 24, 2017 at 9:46 AM, Lukasz Cwik <lc...@google.com> wrote:

> Yes, I was using a Pipeline that was:
> Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a batch
> pipeline in the global window using the default trigger)
>
> In Google Cloud Dataflow, the shuffle step uses the binary representation
> to compare keys, so the above pipeline would normally be converted to the
> following two stages:
> Read -> GBK Writer
> GBK Reader -> IdentityParDo
>
> Note that the GBK Writer and GBK Reader need to use a coder to encode and
> decode the value.
>
> When using the Fn API, those two stages expanded because of the Fn Api
> crossings using a gRPC Write/Read pair:
> Read -> gRPC Write -> gRPC Read -> GBK Writer
> GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo
>
> In my naive prototype implementation, the coder was used to encode
> elements at the gRPC steps. This meant that the coder was
> encoding/decoding/encoding in the first stage and
> decoding/encoding/decoding in the second stage. This tripled the amount of
> times the coder was being invoked per element. This additional use of the
> coder accounted for ~12% (80% of the 15%) of the extra execution time. This
> implementation is quite inefficient and would benefit from merging the gRPC
> Read + GBK Writer into one actor and also the GBK Reader + gRPC Write into
> another actor allowing for the creation of a fast path that can skip parts
> of the decode/encode cycle through the coder. By using a byte array view
> over the logical stream, one can minimize the number of byte array copies
> which plagued my naive implementation. This can be done by only parsing the
> element boundaries out of the stream to produce those logical byte array
> views. I have a very rough estimate that performing this optimization would
> reduce the 12% overhead to somewhere between 4% and 6%.
>
> The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
> marshalling/unmarshalling protos
> handling/managing the socket
> flow control
> ...
>
> Finally, I did try experiments with different buffer sizes (10KB, 100KB,
> 1000KB), flow control (separate thread[1] vs same thread with phaser[2]),
> and channel type [3] (NIO, epoll, domain socket), but coder overhead easily
> dominated the differences in these other experiments.
>
> Further analysis would need to be done to more accurately distill this
> down.
>
> 1: https://github.com/lukecwik/incubator-beam/blob/
> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/
> BufferingStreamObserver.java
> 2: https://github.com/lukecwik/incubator-beam/blob/
> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/
> DirectStreamObserver.java
> 3: https://github.com/lukecwik/incubator-beam/blob/
> fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/
> ManagedChannelFactory.java
>
>
> On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com> wrote:
>
>> Awesome job Lukasz, Excellent, I have to confess the first time I heard
>> about
>> the Fn API idea I was a bit incredulous, but you are making it real,
>> amazing!
>>
>> Just one question from your document, you said that 80% of the extra (15%)
>> time
>> goes into encoding and decoding the data for your test case, can you
>> expand
>> in
>> your current ideas to improve this? (I am not sure I completely understand
>> the
>> issue).
>>
>>
>> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik <lc...@google.com.invalid>
>> wrote:
>>
>> > Responded inline.
>> >
>> > On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com>
>> wrote:
>> >
>> > > This is truly amazing Luke!
>> > >
>> > > If I understand this right, the runner executing the DoFn will
>> delegate
>> > the
>> > > function code and input data (and state, coders, etc.) to the
>> container
>> > > where it will execute with the user's SDK of choice, right ?
>> >
>> >
>> > Yes, that is correct.
>> >
>> >
>> > > I wonder how the containers relate to the underlying engine's worker
>> > > processes ? is it a 1-1, container per worker ? if there's less "work"
>> > for
>> > > the worker's Java process (for example) now and it becomes a sort of
>> > > "dispatcher", would that change the resource allocation commonly used
>> for
>> > > the same Pipeline so that the worker processes would require less
>> > > resources, while giving those to the container ?
>> > >
>> >
>> > I think with the four services (control, data, state, logging) you can
>> go
>> > with a 1-1 relationship or break it up more finely grained and dedicate
>> > some machines to have specific tasks. Like you could have a few machines
>> > dedicated to log aggregation which all the workers push their logs to.
>> > Similarly, you could have some machines that have a lot of memory which
>> > would be better to be able to do shuffles in memory and then this
>> cluster
>> > of high memory machines could front the data service. I believe there
>> is a
>> > lot of flexibility based upon what a runner can do and what it
>> specializes
>> > in and believe that with more effort comes more possibilities albeit
>> with
>> > increased internal complexity.
>> >
>> > The layout of resources depends on whether the services and SDK
>> containers
>> > are co-hosted on the same machine or whether there is a different
>> > architecture in play. In a co-hosted configuration, it seems likely that
>> > the SDK container will get more resources but is dependent on the runner
>> > and pipeline shape (shuffle heavy dominated pipelines will look
>> different
>> > then ParDo dominated pipelines).
>> >
>> >
>> > > About executing sub-graphs, would it be true to say that as long as
>> > there's
>> > > no shuffle, you could keep executing in the same container ? meaning
>> that
>> > > the graph is broken into sub-graphs by shuffles ?
>> > >
>> >
>> > The only thing that is required is that the Apache Beam model is
>> preserved
>> > so typical break points will be at shuffles and language crossing points
>> > (e.g. Python ParDo -> Java ParDo). A runner is free to break up the
>> graph
>> > even more for other reasons.
>> >
>> >
>> > > I have to dig-in deeper, so I could have more questions ;-) thanks
>> Luke!
>> > >
>> > > On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik <lcwik@google.com.invalid
>> >
>> > > wrote:
>> > >
>> > > > I updated the PR description to contain the same.
>> > > >
>> > > > I would start by looking at the API/object model definitions found
>> in
>> > > > beam_fn_api.proto
>> > > > <
>> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
>> > > sdks/common/fn-api/src/main/proto/beam_fn_api.proto
>> > > > >
>> > > >
>> > > > Then depending on your interest, look at the following:
>> > > > * FnHarness.java
>> > > > <
>> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
>> > > sdks/java/harness/src/main/java/org/apache/beam/fn/
>> > harness/FnHarness.java
>> > > > >
>> > > > is the main entry point.
>> > > > * org.apache.beam.fn.harness.data
>> > > > <
>> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
>> > > sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
>> > > > >
>> > > > contains the most interesting bits of code since it is able to
>> > multiplex
>> > > a
>> > > > gRPC stream into multiple logical streams of elements bound for
>> > multiple
>> > > > concurrent process bundle requests. It also contains the code to
>> take
>> > > > multiple logical outbound streams and multiplex them back onto a
>> gRPC
>> > > > stream.
>> > > > * org.apache.beam.runners.core
>> > > > <
>> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
>> > > sdks/java/harness/src/main/java/org/apache/beam/runners/core
>> > > > >
>> > > > contains additional runners akin to the DoFnRunner found in
>> > runners-core
>> > > to
>> > > > support sources and gRPC endpoints.
>> > > >
>> > > > Unless your really interested in how domain sockets, epoll, nio
>> channel
>> > > > factories or how stream readiness callbacks work in gRPC, I would
>> avoid
>> > > the
>> > > > packages org.apache.beam.fn.harness.channel and
>> > > > org.apache.beam.fn.harness.stream. Similarly I would avoid
>> > > > org.apache.beam.fn.harness.fn and org.apache.beam.fn.harness.fake
>> as
>> > > they
>> > > > don't add anything meaningful to the api.
>> > > >
>> > > > Code package descriptions:
>> > > >
>> > > > org.apache.beam.fn.harness.FnHarness: main entry point
>> > > > org.apache.beam.fn.harness.control: Control service client and
>> > > individual
>> > > > request handlers
>> > > > org.apache.beam.fn.harness.data: Data service client and logical
>> > stream
>> > > > multiplexing
>> > > > org.apache.beam.runners.core: Additional runners akin to the
>> DoFnRunner
>> > > > found in runners-core to support sources and gRPC endpoints
>> > > > org.apache.beam.fn.harness.logging: Logging client implementation
>> and
>> > > JUL
>> > > > logging handler adapter
>> > > > org.apache.beam.fn.harness.channel: gRPC channel management
>> > > > org.apache.beam.fn.harness.stream: gRPC stream management
>> > > > org.apache.beam.fn.harness.fn: Java 8 functional interface
>> extensions
>> > > >
>> > > >
>> > > > On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
>> > <klk@google.com.invalid
>> > > >
>> > > > wrote:
>> > > >
>> > > > > This is awesome! Any chance you could roadmap the PR for us with
>> some
>> > > > links
>> > > > > into the most interesting bits?
>> > > > >
>> > > > > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
>> > > > > robertwb@google.com.invalid> wrote:
>> > > > >
>> > > > > > Also, note that we can still support the "simple" case. For
>> > example,
>> > > > > > if the user supplies us with a jar file (as they do now) a
>> runner
>> > > > > > could launch it as a subprocesses and communicate with it via
>> this
>> > > > > > same Fn API or install it in a fixed container itself--the user
>> > > > > > doesn't *need* to know about docker or manually manage
>> containers
>> > > (and
>> > > > > > indeed the Fn API could be used in-process, cross-process,
>> > > > > > cross-container, and even cross-machine).
>> > > > > >
>> > > > > > However docker provides a nice cross-language way of specifying
>> the
>> > > > > > environment including all dependencies (especially for languages
>> > like
>> > > > > > Python or C where the equivalent of a cross-platform,
>> > self-contained
>> > > > > > jar isn't as easy to produce) and is strictly more powerful and
>> > > > > > flexible (specifically it isolates the runtime environment and
>> one
>> > > can
>> > > > > > even use it for local testing).
>> > > > > >
>> > > > > > Slicing a worker up like this without sacrificing performance
>> is an
>> > > > > > ambitious goal, but essential to the story of being able to mix
>> and
>> > > > > > match runners and SDKs arbitrarily, and I think this is a great
>> > > start.
>> > > > > >
>> > > > > >
>> > > > > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
>> > > <lcwik@google.com.invalid
>> > > > >
>> > > > > > wrote:
>> > > > > > > Your correct, a docker container is created that contains the
>> > > > execution
>> > > > > > > environment the user wants or the user re-uses an existing one
>> > > > > (allowing
>> > > > > > > for a user to embed all their code/dependencies or use a
>> > container
>> > > > that
>> > > > > > can
>> > > > > > > deploy code/dependencies on demand).
>> > > > > > > A user creates a pipeline saying which docker container they
>> want
>> > > to
>> > > > > use
>> > > > > > > (this starts to allow for multiple container definitions
>> within a
>> > > > > single
>> > > > > > > pipeline to support multiple languages, versioning, ...).
>> > > > > > > A runner would then be responsible for launching one or more
>> of
>> > > these
>> > > > > > > containers in a cluster manager of their choice (scaling up or
>> > down
>> > > > the
>> > > > > > > number of instances depending on demand/load/...).
>> > > > > > > A runner then interacts with the docker containers over the
>> gRPC
>> > > > > service
>> > > > > > > definitions to delegate processing to.
>> > > > > > >
>> > > > > > >
>> > > > > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
>> > > > jb@nanthrax.net
>> > > > > >
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > >> Hi Luke,
>> > > > > > >>
>> > > > > > >> that's really great and very promising !
>> > > > > > >>
>> > > > > > >> It's really ambitious but I like the idea. Just to clarify:
>> the
>> > > > > purpose
>> > > > > > of
>> > > > > > >> using gRPC is once the docker container is running, then we
>> can
>> > > > > > "interact"
>> > > > > > >> with the container to spread and delegate processing to the
>> > docker
>> > > > > > >> container, correct ?
>> > > > > > >> The users/devops have to setup the docker containers as
>> > > > prerequisite.
>> > > > > > >> Then, the "location" of the containers (kind of container
>> > > registry)
>> > > > is
>> > > > > > set
>> > > > > > >> via the pipeline options and used by gRPC ?
>> > > > > > >>
>> > > > > > >> Thanks Luke !
>> > > > > > >>
>> > > > > > >> Regards
>> > > > > > >> JB
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
>> > > > > > >>
>> > > > > > >>> I have been prototyping several components towards the Beam
>> > > > technical
>> > > > > > >>> vision of being able to execute an arbitrary language using
>> an
>> > > > > > arbitrary
>> > > > > > >>> runner.
>> > > > > > >>>
>> > > > > > >>> I would like to share this overview [1] of what I have been
>> > > working
>> > > > > > >>> towards. I also share this PR [2] with a proposed API,
>> service
>> > > > > > definitions
>> > > > > > >>> and partial implementation.
>> > > > > > >>>
>> > > > > > >>> 1: https://s.apache.org/beam-fn-api
>> > > > > > >>> 2: https://github.com/apache/beam/pull/1801
>> > > > > > >>>
>> > > > > > >>> Please comment on the overview within this thread, and any
>> > > specific
>> > > > > > code
>> > > > > > >>> comments on the PR directly.
>> > > > > > >>>
>> > > > > > >>> Luke
>> > > > > > >>>
>> > > > > > >>>
>> > > > > > >> --
>> > > > > > >> Jean-Baptiste Onofré
>> > > > > > >> jbonofre@apache.org
>> > > > > > >> http://blog.nanthrax.net
>> > > > > > >> Talend - http://www.talend.com
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: Beam Fn API

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
Yes, I was using a Pipeline that was:
Read(10 GiBs of KV (10,000,000 values)) -> GBK -> IdentityParDo (a batch
pipeline in the global window using the default trigger)

In Google Cloud Dataflow, the shuffle step uses the binary representation
to compare keys, so the above pipeline would normally be converted to the
following two stages:
Read -> GBK Writer
GBK Reader -> IdentityParDo

Note that the GBK Writer and GBK Reader need to use a coder to encode and
decode the value.

When using the Fn API, those two stages expanded because of the Fn Api
crossings using a gRPC Write/Read pair:
Read -> gRPC Write -> gRPC Read -> GBK Writer
GBK Reader -> gRPC Write -> gRPC Read -> IdentityParDo

In my naive prototype implementation, the coder was used to encode elements
at the gRPC steps. This meant that the coder was encoding/decoding/encoding
in the first stage and decoding/encoding/decoding in the second stage. This
tripled the amount of times the coder was being invoked per element. This
additional use of the coder accounted for ~12% (80% of the 15%) of the
extra execution time. This implementation is quite inefficient and would
benefit from merging the gRPC Read + GBK Writer into one actor and also the
GBK Reader + gRPC Write into another actor allowing for the creation of a
fast path that can skip parts of the decode/encode cycle through the coder.
By using a byte array view over the logical stream, one can minimize the
number of byte array copies which plagued my naive implementation. This can
be done by only parsing the element boundaries out of the stream to produce
those logical byte array views. I have a very rough estimate that
performing this optimization would reduce the 12% overhead to somewhere
between 4% and 6%.

The remaining 3% (15% - 12%) overhead went to many parts of gRPC:
marshalling/unmarshalling protos
handling/managing the socket
flow control
...

Finally, I did try experiments with different buffer sizes (10KB, 100KB,
1000KB), flow control (separate thread[1] vs same thread with phaser[2]),
and channel type [3] (NIO, epoll, domain socket), but coder overhead easily
dominated the differences in these other experiments.

Further analysis would need to be done to more accurately distill this down.

1:
https://github.com/lukecwik/incubator-beam/blob/fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
2:
https://github.com/lukecwik/incubator-beam/blob/fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java
3:
https://github.com/lukecwik/incubator-beam/blob/fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java


On Tue, Jan 24, 2017 at 8:04 AM, Ismaël Mejía <ie...@gmail.com> wrote:

> Awesome job Lukasz, Excellent, I have to confess the first time I heard
> about
> the Fn API idea I was a bit incredulous, but you are making it real,
> amazing!
>
> Just one question from your document, you said that 80% of the extra (15%)
> time
> goes into encoding and decoding the data for your test case, can you expand
> in
> your current ideas to improve this? (I am not sure I completely understand
> the
> issue).
>
>
> On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> > Responded inline.
> >
> > On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com> wrote:
> >
> > > This is truly amazing Luke!
> > >
> > > If I understand this right, the runner executing the DoFn will delegate
> > the
> > > function code and input data (and state, coders, etc.) to the container
> > > where it will execute with the user's SDK of choice, right ?
> >
> >
> > Yes, that is correct.
> >
> >
> > > I wonder how the containers relate to the underlying engine's worker
> > > processes ? is it a 1-1, container per worker ? if there's less "work"
> > for
> > > the worker's Java process (for example) now and it becomes a sort of
> > > "dispatcher", would that change the resource allocation commonly used
> for
> > > the same Pipeline so that the worker processes would require less
> > > resources, while giving those to the container ?
> > >
> >
> > I think with the four services (control, data, state, logging) you can go
> > with a 1-1 relationship or break it up more finely grained and dedicate
> > some machines to have specific tasks. Like you could have a few machines
> > dedicated to log aggregation which all the workers push their logs to.
> > Similarly, you could have some machines that have a lot of memory which
> > would be better to be able to do shuffles in memory and then this cluster
> > of high memory machines could front the data service. I believe there is
> a
> > lot of flexibility based upon what a runner can do and what it
> specializes
> > in and believe that with more effort comes more possibilities albeit with
> > increased internal complexity.
> >
> > The layout of resources depends on whether the services and SDK
> containers
> > are co-hosted on the same machine or whether there is a different
> > architecture in play. In a co-hosted configuration, it seems likely that
> > the SDK container will get more resources but is dependent on the runner
> > and pipeline shape (shuffle heavy dominated pipelines will look different
> > then ParDo dominated pipelines).
> >
> >
> > > About executing sub-graphs, would it be true to say that as long as
> > there's
> > > no shuffle, you could keep executing in the same container ? meaning
> that
> > > the graph is broken into sub-graphs by shuffles ?
> > >
> >
> > The only thing that is required is that the Apache Beam model is
> preserved
> > so typical break points will be at shuffles and language crossing points
> > (e.g. Python ParDo -> Java ParDo). A runner is free to break up the graph
> > even more for other reasons.
> >
> >
> > > I have to dig-in deeper, so I could have more questions ;-) thanks
> Luke!
> > >
> > > On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik <lc...@google.com.invalid>
> > > wrote:
> > >
> > > > I updated the PR description to contain the same.
> > > >
> > > > I would start by looking at the API/object model definitions found in
> > > > beam_fn_api.proto
> > > > <
> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> > > sdks/common/fn-api/src/main/proto/beam_fn_api.proto
> > > > >
> > > >
> > > > Then depending on your interest, look at the following:
> > > > * FnHarness.java
> > > > <
> > > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> > > sdks/java/harness/src/main/java/org/apache/beam/fn/
> > harness/FnHarness.java
> > > > >
> > > > is the main entry point.
> > > > * org.apache.beam.fn.harness.data
> > > > <
> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> > > sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
> > > > >
> > > > contains the most interesting bits of code since it is able to
> > multiplex
> > > a
> > > > gRPC stream into multiple logical streams of elements bound for
> > multiple
> > > > concurrent process bundle requests. It also contains the code to take
> > > > multiple logical outbound streams and multiplex them back onto a gRPC
> > > > stream.
> > > > * org.apache.beam.runners.core
> > > > <
> > > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> > > sdks/java/harness/src/main/java/org/apache/beam/runners/core
> > > > >
> > > > contains additional runners akin to the DoFnRunner found in
> > runners-core
> > > to
> > > > support sources and gRPC endpoints.
> > > >
> > > > Unless your really interested in how domain sockets, epoll, nio
> channel
> > > > factories or how stream readiness callbacks work in gRPC, I would
> avoid
> > > the
> > > > packages org.apache.beam.fn.harness.channel and
> > > > org.apache.beam.fn.harness.stream. Similarly I would avoid
> > > > org.apache.beam.fn.harness.fn and org.apache.beam.fn.harness.fake as
> > > they
> > > > don't add anything meaningful to the api.
> > > >
> > > > Code package descriptions:
> > > >
> > > > org.apache.beam.fn.harness.FnHarness: main entry point
> > > > org.apache.beam.fn.harness.control: Control service client and
> > > individual
> > > > request handlers
> > > > org.apache.beam.fn.harness.data: Data service client and logical
> > stream
> > > > multiplexing
> > > > org.apache.beam.runners.core: Additional runners akin to the
> DoFnRunner
> > > > found in runners-core to support sources and gRPC endpoints
> > > > org.apache.beam.fn.harness.logging: Logging client implementation
> and
> > > JUL
> > > > logging handler adapter
> > > > org.apache.beam.fn.harness.channel: gRPC channel management
> > > > org.apache.beam.fn.harness.stream: gRPC stream management
> > > > org.apache.beam.fn.harness.fn: Java 8 functional interface extensions
> > > >
> > > >
> > > > On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
> > <klk@google.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > This is awesome! Any chance you could roadmap the PR for us with
> some
> > > > links
> > > > > into the most interesting bits?
> > > > >
> > > > > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
> > > > > robertwb@google.com.invalid> wrote:
> > > > >
> > > > > > Also, note that we can still support the "simple" case. For
> > example,
> > > > > > if the user supplies us with a jar file (as they do now) a runner
> > > > > > could launch it as a subprocesses and communicate with it via
> this
> > > > > > same Fn API or install it in a fixed container itself--the user
> > > > > > doesn't *need* to know about docker or manually manage containers
> > > (and
> > > > > > indeed the Fn API could be used in-process, cross-process,
> > > > > > cross-container, and even cross-machine).
> > > > > >
> > > > > > However docker provides a nice cross-language way of specifying
> the
> > > > > > environment including all dependencies (especially for languages
> > like
> > > > > > Python or C where the equivalent of a cross-platform,
> > self-contained
> > > > > > jar isn't as easy to produce) and is strictly more powerful and
> > > > > > flexible (specifically it isolates the runtime environment and
> one
> > > can
> > > > > > even use it for local testing).
> > > > > >
> > > > > > Slicing a worker up like this without sacrificing performance is
> an
> > > > > > ambitious goal, but essential to the story of being able to mix
> and
> > > > > > match runners and SDKs arbitrarily, and I think this is a great
> > > start.
> > > > > >
> > > > > >
> > > > > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
> > > <lcwik@google.com.invalid
> > > > >
> > > > > > wrote:
> > > > > > > Your correct, a docker container is created that contains the
> > > > execution
> > > > > > > environment the user wants or the user re-uses an existing one
> > > > > (allowing
> > > > > > > for a user to embed all their code/dependencies or use a
> > container
> > > > that
> > > > > > can
> > > > > > > deploy code/dependencies on demand).
> > > > > > > A user creates a pipeline saying which docker container they
> want
> > > to
> > > > > use
> > > > > > > (this starts to allow for multiple container definitions
> within a
> > > > > single
> > > > > > > pipeline to support multiple languages, versioning, ...).
> > > > > > > A runner would then be responsible for launching one or more of
> > > these
> > > > > > > containers in a cluster manager of their choice (scaling up or
> > down
> > > > the
> > > > > > > number of instances depending on demand/load/...).
> > > > > > > A runner then interacts with the docker containers over the
> gRPC
> > > > > service
> > > > > > > definitions to delegate processing to.
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
> > > > jb@nanthrax.net
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi Luke,
> > > > > > >>
> > > > > > >> that's really great and very promising !
> > > > > > >>
> > > > > > >> It's really ambitious but I like the idea. Just to clarify:
> the
> > > > > purpose
> > > > > > of
> > > > > > >> using gRPC is once the docker container is running, then we
> can
> > > > > > "interact"
> > > > > > >> with the container to spread and delegate processing to the
> > docker
> > > > > > >> container, correct ?
> > > > > > >> The users/devops have to setup the docker containers as
> > > > prerequisite.
> > > > > > >> Then, the "location" of the containers (kind of container
> > > registry)
> > > > is
> > > > > > set
> > > > > > >> via the pipeline options and used by gRPC ?
> > > > > > >>
> > > > > > >> Thanks Luke !
> > > > > > >>
> > > > > > >> Regards
> > > > > > >> JB
> > > > > > >>
> > > > > > >>
> > > > > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
> > > > > > >>
> > > > > > >>> I have been prototyping several components towards the Beam
> > > > technical
> > > > > > >>> vision of being able to execute an arbitrary language using
> an
> > > > > > arbitrary
> > > > > > >>> runner.
> > > > > > >>>
> > > > > > >>> I would like to share this overview [1] of what I have been
> > > working
> > > > > > >>> towards. I also share this PR [2] with a proposed API,
> service
> > > > > > definitions
> > > > > > >>> and partial implementation.
> > > > > > >>>
> > > > > > >>> 1: https://s.apache.org/beam-fn-api
> > > > > > >>> 2: https://github.com/apache/beam/pull/1801
> > > > > > >>>
> > > > > > >>> Please comment on the overview within this thread, and any
> > > specific
> > > > > > code
> > > > > > >>> comments on the PR directly.
> > > > > > >>>
> > > > > > >>> Luke
> > > > > > >>>
> > > > > > >>>
> > > > > > >> --
> > > > > > >> Jean-Baptiste Onofré
> > > > > > >> jbonofre@apache.org
> > > > > > >> http://blog.nanthrax.net
> > > > > > >> Talend - http://www.talend.com
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Beam Fn API

Posted by Ismaël Mejía <ie...@gmail.com>.
Awesome job Lukasz, Excellent, I have to confess the first time I heard
about
the Fn API idea I was a bit incredulous, but you are making it real,
amazing!

Just one question from your document, you said that 80% of the extra (15%)
time
goes into encoding and decoding the data for your test case, can you expand
in
your current ideas to improve this? (I am not sure I completely understand
the
issue).


On Mon, Jan 23, 2017 at 7:10 PM, Lukasz Cwik <lc...@google.com.invalid>
wrote:

> Responded inline.
>
> On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com> wrote:
>
> > This is truly amazing Luke!
> >
> > If I understand this right, the runner executing the DoFn will delegate
> the
> > function code and input data (and state, coders, etc.) to the container
> > where it will execute with the user's SDK of choice, right ?
>
>
> Yes, that is correct.
>
>
> > I wonder how the containers relate to the underlying engine's worker
> > processes ? is it a 1-1, container per worker ? if there's less "work"
> for
> > the worker's Java process (for example) now and it becomes a sort of
> > "dispatcher", would that change the resource allocation commonly used for
> > the same Pipeline so that the worker processes would require less
> > resources, while giving those to the container ?
> >
>
> I think with the four services (control, data, state, logging) you can go
> with a 1-1 relationship or break it up more finely grained and dedicate
> some machines to have specific tasks. Like you could have a few machines
> dedicated to log aggregation which all the workers push their logs to.
> Similarly, you could have some machines that have a lot of memory which
> would be better to be able to do shuffles in memory and then this cluster
> of high memory machines could front the data service. I believe there is a
> lot of flexibility based upon what a runner can do and what it specializes
> in and believe that with more effort comes more possibilities albeit with
> increased internal complexity.
>
> The layout of resources depends on whether the services and SDK containers
> are co-hosted on the same machine or whether there is a different
> architecture in play. In a co-hosted configuration, it seems likely that
> the SDK container will get more resources but is dependent on the runner
> and pipeline shape (shuffle heavy dominated pipelines will look different
> then ParDo dominated pipelines).
>
>
> > About executing sub-graphs, would it be true to say that as long as
> there's
> > no shuffle, you could keep executing in the same container ? meaning that
> > the graph is broken into sub-graphs by shuffles ?
> >
>
> The only thing that is required is that the Apache Beam model is preserved
> so typical break points will be at shuffles and language crossing points
> (e.g. Python ParDo -> Java ParDo). A runner is free to break up the graph
> even more for other reasons.
>
>
> > I have to dig-in deeper, so I could have more questions ;-) thanks Luke!
> >
> > On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik <lc...@google.com.invalid>
> > wrote:
> >
> > > I updated the PR description to contain the same.
> > >
> > > I would start by looking at the API/object model definitions found in
> > > beam_fn_api.proto
> > > <
> > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> > sdks/common/fn-api/src/main/proto/beam_fn_api.proto
> > > >
> > >
> > > Then depending on your interest, look at the following:
> > > * FnHarness.java
> > > <
> > > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> > sdks/java/harness/src/main/java/org/apache/beam/fn/
> harness/FnHarness.java
> > > >
> > > is the main entry point.
> > > * org.apache.beam.fn.harness.data
> > > <
> > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> > sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
> > > >
> > > contains the most interesting bits of code since it is able to
> multiplex
> > a
> > > gRPC stream into multiple logical streams of elements bound for
> multiple
> > > concurrent process bundle requests. It also contains the code to take
> > > multiple logical outbound streams and multiplex them back onto a gRPC
> > > stream.
> > > * org.apache.beam.runners.core
> > > <
> > > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> > sdks/java/harness/src/main/java/org/apache/beam/runners/core
> > > >
> > > contains additional runners akin to the DoFnRunner found in
> runners-core
> > to
> > > support sources and gRPC endpoints.
> > >
> > > Unless your really interested in how domain sockets, epoll, nio channel
> > > factories or how stream readiness callbacks work in gRPC, I would avoid
> > the
> > > packages org.apache.beam.fn.harness.channel and
> > > org.apache.beam.fn.harness.stream. Similarly I would avoid
> > > org.apache.beam.fn.harness.fn and org.apache.beam.fn.harness.fake as
> > they
> > > don't add anything meaningful to the api.
> > >
> > > Code package descriptions:
> > >
> > > org.apache.beam.fn.harness.FnHarness: main entry point
> > > org.apache.beam.fn.harness.control: Control service client and
> > individual
> > > request handlers
> > > org.apache.beam.fn.harness.data: Data service client and logical
> stream
> > > multiplexing
> > > org.apache.beam.runners.core: Additional runners akin to the DoFnRunner
> > > found in runners-core to support sources and gRPC endpoints
> > > org.apache.beam.fn.harness.logging: Logging client implementation and
> > JUL
> > > logging handler adapter
> > > org.apache.beam.fn.harness.channel: gRPC channel management
> > > org.apache.beam.fn.harness.stream: gRPC stream management
> > > org.apache.beam.fn.harness.fn: Java 8 functional interface extensions
> > >
> > >
> > > On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles
> <klk@google.com.invalid
> > >
> > > wrote:
> > >
> > > > This is awesome! Any chance you could roadmap the PR for us with some
> > > links
> > > > into the most interesting bits?
> > > >
> > > > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
> > > > robertwb@google.com.invalid> wrote:
> > > >
> > > > > Also, note that we can still support the "simple" case. For
> example,
> > > > > if the user supplies us with a jar file (as they do now) a runner
> > > > > could launch it as a subprocesses and communicate with it via this
> > > > > same Fn API or install it in a fixed container itself--the user
> > > > > doesn't *need* to know about docker or manually manage containers
> > (and
> > > > > indeed the Fn API could be used in-process, cross-process,
> > > > > cross-container, and even cross-machine).
> > > > >
> > > > > However docker provides a nice cross-language way of specifying the
> > > > > environment including all dependencies (especially for languages
> like
> > > > > Python or C where the equivalent of a cross-platform,
> self-contained
> > > > > jar isn't as easy to produce) and is strictly more powerful and
> > > > > flexible (specifically it isolates the runtime environment and one
> > can
> > > > > even use it for local testing).
> > > > >
> > > > > Slicing a worker up like this without sacrificing performance is an
> > > > > ambitious goal, but essential to the story of being able to mix and
> > > > > match runners and SDKs arbitrarily, and I think this is a great
> > start.
> > > > >
> > > > >
> > > > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
> > <lcwik@google.com.invalid
> > > >
> > > > > wrote:
> > > > > > Your correct, a docker container is created that contains the
> > > execution
> > > > > > environment the user wants or the user re-uses an existing one
> > > > (allowing
> > > > > > for a user to embed all their code/dependencies or use a
> container
> > > that
> > > > > can
> > > > > > deploy code/dependencies on demand).
> > > > > > A user creates a pipeline saying which docker container they want
> > to
> > > > use
> > > > > > (this starts to allow for multiple container definitions within a
> > > > single
> > > > > > pipeline to support multiple languages, versioning, ...).
> > > > > > A runner would then be responsible for launching one or more of
> > these
> > > > > > containers in a cluster manager of their choice (scaling up or
> down
> > > the
> > > > > > number of instances depending on demand/load/...).
> > > > > > A runner then interacts with the docker containers over the gRPC
> > > > service
> > > > > > definitions to delegate processing to.
> > > > > >
> > > > > >
> > > > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
> > > jb@nanthrax.net
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Luke,
> > > > > >>
> > > > > >> that's really great and very promising !
> > > > > >>
> > > > > >> It's really ambitious but I like the idea. Just to clarify: the
> > > > purpose
> > > > > of
> > > > > >> using gRPC is once the docker container is running, then we can
> > > > > "interact"
> > > > > >> with the container to spread and delegate processing to the
> docker
> > > > > >> container, correct ?
> > > > > >> The users/devops have to setup the docker containers as
> > > prerequisite.
> > > > > >> Then, the "location" of the containers (kind of container
> > registry)
> > > is
> > > > > set
> > > > > >> via the pipeline options and used by gRPC ?
> > > > > >>
> > > > > >> Thanks Luke !
> > > > > >>
> > > > > >> Regards
> > > > > >> JB
> > > > > >>
> > > > > >>
> > > > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
> > > > > >>
> > > > > >>> I have been prototyping several components towards the Beam
> > > technical
> > > > > >>> vision of being able to execute an arbitrary language using an
> > > > > arbitrary
> > > > > >>> runner.
> > > > > >>>
> > > > > >>> I would like to share this overview [1] of what I have been
> > working
> > > > > >>> towards. I also share this PR [2] with a proposed API, service
> > > > > definitions
> > > > > >>> and partial implementation.
> > > > > >>>
> > > > > >>> 1: https://s.apache.org/beam-fn-api
> > > > > >>> 2: https://github.com/apache/beam/pull/1801
> > > > > >>>
> > > > > >>> Please comment on the overview within this thread, and any
> > specific
> > > > > code
> > > > > >>> comments on the PR directly.
> > > > > >>>
> > > > > >>> Luke
> > > > > >>>
> > > > > >>>
> > > > > >> --
> > > > > >> Jean-Baptiste Onofré
> > > > > >> jbonofre@apache.org
> > > > > >> http://blog.nanthrax.net
> > > > > >> Talend - http://www.talend.com
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Re: Beam Fn API

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
Responded inline.

On Sat, Jan 21, 2017 at 8:20 AM, Amit Sela <am...@gmail.com> wrote:

> This is truly amazing Luke!
>
> If I understand this right, the runner executing the DoFn will delegate the
> function code and input data (and state, coders, etc.) to the container
> where it will execute with the user's SDK of choice, right ?


Yes, that is correct.


> I wonder how the containers relate to the underlying engine's worker
> processes ? is it a 1-1, container per worker ? if there's less "work" for
> the worker's Java process (for example) now and it becomes a sort of
> "dispatcher", would that change the resource allocation commonly used for
> the same Pipeline so that the worker processes would require less
> resources, while giving those to the container ?
>

I think with the four services (control, data, state, logging) you can go
with a 1-1 relationship or break it up more finely grained and dedicate
some machines to have specific tasks. Like you could have a few machines
dedicated to log aggregation which all the workers push their logs to.
Similarly, you could have some machines that have a lot of memory which
would be better to be able to do shuffles in memory and then this cluster
of high memory machines could front the data service. I believe there is a
lot of flexibility based upon what a runner can do and what it specializes
in and believe that with more effort comes more possibilities albeit with
increased internal complexity.

The layout of resources depends on whether the services and SDK containers
are co-hosted on the same machine or whether there is a different
architecture in play. In a co-hosted configuration, it seems likely that
the SDK container will get more resources but is dependent on the runner
and pipeline shape (shuffle heavy dominated pipelines will look different
then ParDo dominated pipelines).


> About executing sub-graphs, would it be true to say that as long as there's
> no shuffle, you could keep executing in the same container ? meaning that
> the graph is broken into sub-graphs by shuffles ?
>

The only thing that is required is that the Apache Beam model is preserved
so typical break points will be at shuffles and language crossing points
(e.g. Python ParDo -> Java ParDo). A runner is free to break up the graph
even more for other reasons.


> I have to dig-in deeper, so I could have more questions ;-) thanks Luke!
>
> On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> > I updated the PR description to contain the same.
> >
> > I would start by looking at the API/object model definitions found in
> > beam_fn_api.proto
> > <
> > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> sdks/common/fn-api/src/main/proto/beam_fn_api.proto
> > >
> >
> > Then depending on your interest, look at the following:
> > * FnHarness.java
> > <
> > https://github.com/lukecwik/incubator-beam/blob/fn_api/
> sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
> > >
> > is the main entry point.
> > * org.apache.beam.fn.harness.data
> > <
> > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
> > >
> > contains the most interesting bits of code since it is able to multiplex
> a
> > gRPC stream into multiple logical streams of elements bound for multiple
> > concurrent process bundle requests. It also contains the code to take
> > multiple logical outbound streams and multiplex them back onto a gRPC
> > stream.
> > * org.apache.beam.runners.core
> > <
> > https://github.com/lukecwik/incubator-beam/tree/fn_api/
> sdks/java/harness/src/main/java/org/apache/beam/runners/core
> > >
> > contains additional runners akin to the DoFnRunner found in runners-core
> to
> > support sources and gRPC endpoints.
> >
> > Unless your really interested in how domain sockets, epoll, nio channel
> > factories or how stream readiness callbacks work in gRPC, I would avoid
> the
> > packages org.apache.beam.fn.harness.channel and
> > org.apache.beam.fn.harness.stream. Similarly I would avoid
> > org.apache.beam.fn.harness.fn and org.apache.beam.fn.harness.fake as
> they
> > don't add anything meaningful to the api.
> >
> > Code package descriptions:
> >
> > org.apache.beam.fn.harness.FnHarness: main entry point
> > org.apache.beam.fn.harness.control: Control service client and
> individual
> > request handlers
> > org.apache.beam.fn.harness.data: Data service client and logical stream
> > multiplexing
> > org.apache.beam.runners.core: Additional runners akin to the DoFnRunner
> > found in runners-core to support sources and gRPC endpoints
> > org.apache.beam.fn.harness.logging: Logging client implementation and
> JUL
> > logging handler adapter
> > org.apache.beam.fn.harness.channel: gRPC channel management
> > org.apache.beam.fn.harness.stream: gRPC stream management
> > org.apache.beam.fn.harness.fn: Java 8 functional interface extensions
> >
> >
> > On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles <klk@google.com.invalid
> >
> > wrote:
> >
> > > This is awesome! Any chance you could roadmap the PR for us with some
> > links
> > > into the most interesting bits?
> > >
> > > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
> > > robertwb@google.com.invalid> wrote:
> > >
> > > > Also, note that we can still support the "simple" case. For example,
> > > > if the user supplies us with a jar file (as they do now) a runner
> > > > could launch it as a subprocesses and communicate with it via this
> > > > same Fn API or install it in a fixed container itself--the user
> > > > doesn't *need* to know about docker or manually manage containers
> (and
> > > > indeed the Fn API could be used in-process, cross-process,
> > > > cross-container, and even cross-machine).
> > > >
> > > > However docker provides a nice cross-language way of specifying the
> > > > environment including all dependencies (especially for languages like
> > > > Python or C where the equivalent of a cross-platform, self-contained
> > > > jar isn't as easy to produce) and is strictly more powerful and
> > > > flexible (specifically it isolates the runtime environment and one
> can
> > > > even use it for local testing).
> > > >
> > > > Slicing a worker up like this without sacrificing performance is an
> > > > ambitious goal, but essential to the story of being able to mix and
> > > > match runners and SDKs arbitrarily, and I think this is a great
> start.
> > > >
> > > >
> > > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik
> <lcwik@google.com.invalid
> > >
> > > > wrote:
> > > > > Your correct, a docker container is created that contains the
> > execution
> > > > > environment the user wants or the user re-uses an existing one
> > > (allowing
> > > > > for a user to embed all their code/dependencies or use a container
> > that
> > > > can
> > > > > deploy code/dependencies on demand).
> > > > > A user creates a pipeline saying which docker container they want
> to
> > > use
> > > > > (this starts to allow for multiple container definitions within a
> > > single
> > > > > pipeline to support multiple languages, versioning, ...).
> > > > > A runner would then be responsible for launching one or more of
> these
> > > > > containers in a cluster manager of their choice (scaling up or down
> > the
> > > > > number of instances depending on demand/load/...).
> > > > > A runner then interacts with the docker containers over the gRPC
> > > service
> > > > > definitions to delegate processing to.
> > > > >
> > > > >
> > > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
> > jb@nanthrax.net
> > > >
> > > > > wrote:
> > > > >
> > > > >> Hi Luke,
> > > > >>
> > > > >> that's really great and very promising !
> > > > >>
> > > > >> It's really ambitious but I like the idea. Just to clarify: the
> > > purpose
> > > > of
> > > > >> using gRPC is once the docker container is running, then we can
> > > > "interact"
> > > > >> with the container to spread and delegate processing to the docker
> > > > >> container, correct ?
> > > > >> The users/devops have to setup the docker containers as
> > prerequisite.
> > > > >> Then, the "location" of the containers (kind of container
> registry)
> > is
> > > > set
> > > > >> via the pipeline options and used by gRPC ?
> > > > >>
> > > > >> Thanks Luke !
> > > > >>
> > > > >> Regards
> > > > >> JB
> > > > >>
> > > > >>
> > > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
> > > > >>
> > > > >>> I have been prototyping several components towards the Beam
> > technical
> > > > >>> vision of being able to execute an arbitrary language using an
> > > > arbitrary
> > > > >>> runner.
> > > > >>>
> > > > >>> I would like to share this overview [1] of what I have been
> working
> > > > >>> towards. I also share this PR [2] with a proposed API, service
> > > > definitions
> > > > >>> and partial implementation.
> > > > >>>
> > > > >>> 1: https://s.apache.org/beam-fn-api
> > > > >>> 2: https://github.com/apache/beam/pull/1801
> > > > >>>
> > > > >>> Please comment on the overview within this thread, and any
> specific
> > > > code
> > > > >>> comments on the PR directly.
> > > > >>>
> > > > >>> Luke
> > > > >>>
> > > > >>>
> > > > >> --
> > > > >> Jean-Baptiste Onofré
> > > > >> jbonofre@apache.org
> > > > >> http://blog.nanthrax.net
> > > > >> Talend - http://www.talend.com
> > > > >>
> > > >
> > >
> >
>

Re: Beam Fn API

Posted by Amit Sela <am...@gmail.com>.
This is truly amazing Luke!

If I understand this right, the runner executing the DoFn will delegate the
function code and input data (and state, coders, etc.) to the container
where it will execute with the user's SDK of choice, right ?

I wonder how the containers relate to the underlying engine's worker
processes ? is it a 1-1, container per worker ? if there's less "work" for
the worker's Java process (for example) now and it becomes a sort of
"dispatcher", would that change the resource allocation commonly used for
the same Pipeline so that the worker processes would require less
resources, while giving those to the container ?

About executing sub-graphs, would it be true to say that as long as there's
no shuffle, you could keep executing in the same container ? meaning that
the graph is broken into sub-graphs by shuffles ?

I have to dig-in deeper, so I could have more questions ;-) thanks Luke!

On Sat, Jan 21, 2017 at 1:52 AM Lukasz Cwik <lc...@google.com.invalid>
wrote:

> I updated the PR description to contain the same.
>
> I would start by looking at the API/object model definitions found in
> beam_fn_api.proto
> <
> https://github.com/lukecwik/incubator-beam/blob/fn_api/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
> >
>
> Then depending on your interest, look at the following:
> * FnHarness.java
> <
> https://github.com/lukecwik/incubator-beam/blob/fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
> >
> is the main entry point.
> * org.apache.beam.fn.harness.data
> <
> https://github.com/lukecwik/incubator-beam/tree/fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data
> >
> contains the most interesting bits of code since it is able to multiplex a
> gRPC stream into multiple logical streams of elements bound for multiple
> concurrent process bundle requests. It also contains the code to take
> multiple logical outbound streams and multiplex them back onto a gRPC
> stream.
> * org.apache.beam.runners.core
> <
> https://github.com/lukecwik/incubator-beam/tree/fn_api/sdks/java/harness/src/main/java/org/apache/beam/runners/core
> >
> contains additional runners akin to the DoFnRunner found in runners-core to
> support sources and gRPC endpoints.
>
> Unless your really interested in how domain sockets, epoll, nio channel
> factories or how stream readiness callbacks work in gRPC, I would avoid the
> packages org.apache.beam.fn.harness.channel and
> org.apache.beam.fn.harness.stream. Similarly I would avoid
> org.apache.beam.fn.harness.fn and org.apache.beam.fn.harness.fake as they
> don't add anything meaningful to the api.
>
> Code package descriptions:
>
> org.apache.beam.fn.harness.FnHarness: main entry point
> org.apache.beam.fn.harness.control: Control service client and individual
> request handlers
> org.apache.beam.fn.harness.data: Data service client and logical stream
> multiplexing
> org.apache.beam.runners.core: Additional runners akin to the DoFnRunner
> found in runners-core to support sources and gRPC endpoints
> org.apache.beam.fn.harness.logging: Logging client implementation and JUL
> logging handler adapter
> org.apache.beam.fn.harness.channel: gRPC channel management
> org.apache.beam.fn.harness.stream: gRPC stream management
> org.apache.beam.fn.harness.fn: Java 8 functional interface extensions
>
>
> On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles <kl...@google.com.invalid>
> wrote:
>
> > This is awesome! Any chance you could roadmap the PR for us with some
> links
> > into the most interesting bits?
> >
> > On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
> > robertwb@google.com.invalid> wrote:
> >
> > > Also, note that we can still support the "simple" case. For example,
> > > if the user supplies us with a jar file (as they do now) a runner
> > > could launch it as a subprocesses and communicate with it via this
> > > same Fn API or install it in a fixed container itself--the user
> > > doesn't *need* to know about docker or manually manage containers (and
> > > indeed the Fn API could be used in-process, cross-process,
> > > cross-container, and even cross-machine).
> > >
> > > However docker provides a nice cross-language way of specifying the
> > > environment including all dependencies (especially for languages like
> > > Python or C where the equivalent of a cross-platform, self-contained
> > > jar isn't as easy to produce) and is strictly more powerful and
> > > flexible (specifically it isolates the runtime environment and one can
> > > even use it for local testing).
> > >
> > > Slicing a worker up like this without sacrificing performance is an
> > > ambitious goal, but essential to the story of being able to mix and
> > > match runners and SDKs arbitrarily, and I think this is a great start.
> > >
> > >
> > > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik <lcwik@google.com.invalid
> >
> > > wrote:
> > > > Your correct, a docker container is created that contains the
> execution
> > > > environment the user wants or the user re-uses an existing one
> > (allowing
> > > > for a user to embed all their code/dependencies or use a container
> that
> > > can
> > > > deploy code/dependencies on demand).
> > > > A user creates a pipeline saying which docker container they want to
> > use
> > > > (this starts to allow for multiple container definitions within a
> > single
> > > > pipeline to support multiple languages, versioning, ...).
> > > > A runner would then be responsible for launching one or more of these
> > > > containers in a cluster manager of their choice (scaling up or down
> the
> > > > number of instances depending on demand/load/...).
> > > > A runner then interacts with the docker containers over the gRPC
> > service
> > > > definitions to delegate processing to.
> > > >
> > > >
> > > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <
> jb@nanthrax.net
> > >
> > > > wrote:
> > > >
> > > >> Hi Luke,
> > > >>
> > > >> that's really great and very promising !
> > > >>
> > > >> It's really ambitious but I like the idea. Just to clarify: the
> > purpose
> > > of
> > > >> using gRPC is once the docker container is running, then we can
> > > "interact"
> > > >> with the container to spread and delegate processing to the docker
> > > >> container, correct ?
> > > >> The users/devops have to setup the docker containers as
> prerequisite.
> > > >> Then, the "location" of the containers (kind of container registry)
> is
> > > set
> > > >> via the pipeline options and used by gRPC ?
> > > >>
> > > >> Thanks Luke !
> > > >>
> > > >> Regards
> > > >> JB
> > > >>
> > > >>
> > > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
> > > >>
> > > >>> I have been prototyping several components towards the Beam
> technical
> > > >>> vision of being able to execute an arbitrary language using an
> > > arbitrary
> > > >>> runner.
> > > >>>
> > > >>> I would like to share this overview [1] of what I have been working
> > > >>> towards. I also share this PR [2] with a proposed API, service
> > > definitions
> > > >>> and partial implementation.
> > > >>>
> > > >>> 1: https://s.apache.org/beam-fn-api
> > > >>> 2: https://github.com/apache/beam/pull/1801
> > > >>>
> > > >>> Please comment on the overview within this thread, and any specific
> > > code
> > > >>> comments on the PR directly.
> > > >>>
> > > >>> Luke
> > > >>>
> > > >>>
> > > >> --
> > > >> Jean-Baptiste Onofré
> > > >> jbonofre@apache.org
> > > >> http://blog.nanthrax.net
> > > >> Talend - http://www.talend.com
> > > >>
> > >
> >
>

Re: Beam Fn API

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
I updated the PR description to contain the same.

I would start by looking at the API/object model definitions found in
beam_fn_api.proto
<https://github.com/lukecwik/incubator-beam/blob/fn_api/sdks/common/fn-api/src/main/proto/beam_fn_api.proto>

Then depending on your interest, look at the following:
* FnHarness.java
<https://github.com/lukecwik/incubator-beam/blob/fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java>
is the main entry point.
* org.apache.beam.fn.harness.data
<https://github.com/lukecwik/incubator-beam/tree/fn_api/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data>
contains the most interesting bits of code since it is able to multiplex a
gRPC stream into multiple logical streams of elements bound for multiple
concurrent process bundle requests. It also contains the code to take
multiple logical outbound streams and multiplex them back onto a gRPC
stream.
* org.apache.beam.runners.core
<https://github.com/lukecwik/incubator-beam/tree/fn_api/sdks/java/harness/src/main/java/org/apache/beam/runners/core>
contains additional runners akin to the DoFnRunner found in runners-core to
support sources and gRPC endpoints.

Unless your really interested in how domain sockets, epoll, nio channel
factories or how stream readiness callbacks work in gRPC, I would avoid the
packages org.apache.beam.fn.harness.channel and
org.apache.beam.fn.harness.stream. Similarly I would avoid
org.apache.beam.fn.harness.fn and org.apache.beam.fn.harness.fake as they
don't add anything meaningful to the api.

Code package descriptions:

org.apache.beam.fn.harness.FnHarness: main entry point
org.apache.beam.fn.harness.control: Control service client and individual
request handlers
org.apache.beam.fn.harness.data: Data service client and logical stream
multiplexing
org.apache.beam.runners.core: Additional runners akin to the DoFnRunner
found in runners-core to support sources and gRPC endpoints
org.apache.beam.fn.harness.logging: Logging client implementation and JUL
logging handler adapter
org.apache.beam.fn.harness.channel: gRPC channel management
org.apache.beam.fn.harness.stream: gRPC stream management
org.apache.beam.fn.harness.fn: Java 8 functional interface extensions


On Fri, Jan 20, 2017 at 1:26 PM, Kenneth Knowles <kl...@google.com.invalid>
wrote:

> This is awesome! Any chance you could roadmap the PR for us with some links
> into the most interesting bits?
>
> On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
> robertwb@google.com.invalid> wrote:
>
> > Also, note that we can still support the "simple" case. For example,
> > if the user supplies us with a jar file (as they do now) a runner
> > could launch it as a subprocesses and communicate with it via this
> > same Fn API or install it in a fixed container itself--the user
> > doesn't *need* to know about docker or manually manage containers (and
> > indeed the Fn API could be used in-process, cross-process,
> > cross-container, and even cross-machine).
> >
> > However docker provides a nice cross-language way of specifying the
> > environment including all dependencies (especially for languages like
> > Python or C where the equivalent of a cross-platform, self-contained
> > jar isn't as easy to produce) and is strictly more powerful and
> > flexible (specifically it isolates the runtime environment and one can
> > even use it for local testing).
> >
> > Slicing a worker up like this without sacrificing performance is an
> > ambitious goal, but essential to the story of being able to mix and
> > match runners and SDKs arbitrarily, and I think this is a great start.
> >
> >
> > On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik <lc...@google.com.invalid>
> > wrote:
> > > Your correct, a docker container is created that contains the execution
> > > environment the user wants or the user re-uses an existing one
> (allowing
> > > for a user to embed all their code/dependencies or use a container that
> > can
> > > deploy code/dependencies on demand).
> > > A user creates a pipeline saying which docker container they want to
> use
> > > (this starts to allow for multiple container definitions within a
> single
> > > pipeline to support multiple languages, versioning, ...).
> > > A runner would then be responsible for launching one or more of these
> > > containers in a cluster manager of their choice (scaling up or down the
> > > number of instances depending on demand/load/...).
> > > A runner then interacts with the docker containers over the gRPC
> service
> > > definitions to delegate processing to.
> > >
> > >
> > > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <jb@nanthrax.net
> >
> > > wrote:
> > >
> > >> Hi Luke,
> > >>
> > >> that's really great and very promising !
> > >>
> > >> It's really ambitious but I like the idea. Just to clarify: the
> purpose
> > of
> > >> using gRPC is once the docker container is running, then we can
> > "interact"
> > >> with the container to spread and delegate processing to the docker
> > >> container, correct ?
> > >> The users/devops have to setup the docker containers as prerequisite.
> > >> Then, the "location" of the containers (kind of container registry) is
> > set
> > >> via the pipeline options and used by gRPC ?
> > >>
> > >> Thanks Luke !
> > >>
> > >> Regards
> > >> JB
> > >>
> > >>
> > >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
> > >>
> > >>> I have been prototyping several components towards the Beam technical
> > >>> vision of being able to execute an arbitrary language using an
> > arbitrary
> > >>> runner.
> > >>>
> > >>> I would like to share this overview [1] of what I have been working
> > >>> towards. I also share this PR [2] with a proposed API, service
> > definitions
> > >>> and partial implementation.
> > >>>
> > >>> 1: https://s.apache.org/beam-fn-api
> > >>> 2: https://github.com/apache/beam/pull/1801
> > >>>
> > >>> Please comment on the overview within this thread, and any specific
> > code
> > >>> comments on the PR directly.
> > >>>
> > >>> Luke
> > >>>
> > >>>
> > >> --
> > >> Jean-Baptiste Onofré
> > >> jbonofre@apache.org
> > >> http://blog.nanthrax.net
> > >> Talend - http://www.talend.com
> > >>
> >
>

Re: Beam Fn API

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
This is awesome! Any chance you could roadmap the PR for us with some links
into the most interesting bits?

On Fri, Jan 20, 2017 at 12:19 PM, Robert Bradshaw <
robertwb@google.com.invalid> wrote:

> Also, note that we can still support the "simple" case. For example,
> if the user supplies us with a jar file (as they do now) a runner
> could launch it as a subprocesses and communicate with it via this
> same Fn API or install it in a fixed container itself--the user
> doesn't *need* to know about docker or manually manage containers (and
> indeed the Fn API could be used in-process, cross-process,
> cross-container, and even cross-machine).
>
> However docker provides a nice cross-language way of specifying the
> environment including all dependencies (especially for languages like
> Python or C where the equivalent of a cross-platform, self-contained
> jar isn't as easy to produce) and is strictly more powerful and
> flexible (specifically it isolates the runtime environment and one can
> even use it for local testing).
>
> Slicing a worker up like this without sacrificing performance is an
> ambitious goal, but essential to the story of being able to mix and
> match runners and SDKs arbitrarily, and I think this is a great start.
>
>
> On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik <lc...@google.com.invalid>
> wrote:
> > Your correct, a docker container is created that contains the execution
> > environment the user wants or the user re-uses an existing one (allowing
> > for a user to embed all their code/dependencies or use a container that
> can
> > deploy code/dependencies on demand).
> > A user creates a pipeline saying which docker container they want to use
> > (this starts to allow for multiple container definitions within a single
> > pipeline to support multiple languages, versioning, ...).
> > A runner would then be responsible for launching one or more of these
> > containers in a cluster manager of their choice (scaling up or down the
> > number of instances depending on demand/load/...).
> > A runner then interacts with the docker containers over the gRPC service
> > definitions to delegate processing to.
> >
> >
> > On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> > wrote:
> >
> >> Hi Luke,
> >>
> >> that's really great and very promising !
> >>
> >> It's really ambitious but I like the idea. Just to clarify: the purpose
> of
> >> using gRPC is once the docker container is running, then we can
> "interact"
> >> with the container to spread and delegate processing to the docker
> >> container, correct ?
> >> The users/devops have to setup the docker containers as prerequisite.
> >> Then, the "location" of the containers (kind of container registry) is
> set
> >> via the pipeline options and used by gRPC ?
> >>
> >> Thanks Luke !
> >>
> >> Regards
> >> JB
> >>
> >>
> >> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
> >>
> >>> I have been prototyping several components towards the Beam technical
> >>> vision of being able to execute an arbitrary language using an
> arbitrary
> >>> runner.
> >>>
> >>> I would like to share this overview [1] of what I have been working
> >>> towards. I also share this PR [2] with a proposed API, service
> definitions
> >>> and partial implementation.
> >>>
> >>> 1: https://s.apache.org/beam-fn-api
> >>> 2: https://github.com/apache/beam/pull/1801
> >>>
> >>> Please comment on the overview within this thread, and any specific
> code
> >>> comments on the PR directly.
> >>>
> >>> Luke
> >>>
> >>>
> >> --
> >> Jean-Baptiste Onofré
> >> jbonofre@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >>
>

Re: Beam Fn API

Posted by Robert Bradshaw <ro...@google.com.INVALID>.
Also, note that we can still support the "simple" case. For example,
if the user supplies us with a jar file (as they do now) a runner
could launch it as a subprocesses and communicate with it via this
same Fn API or install it in a fixed container itself--the user
doesn't *need* to know about docker or manually manage containers (and
indeed the Fn API could be used in-process, cross-process,
cross-container, and even cross-machine).

However docker provides a nice cross-language way of specifying the
environment including all dependencies (especially for languages like
Python or C where the equivalent of a cross-platform, self-contained
jar isn't as easy to produce) and is strictly more powerful and
flexible (specifically it isolates the runtime environment and one can
even use it for local testing).

Slicing a worker up like this without sacrificing performance is an
ambitious goal, but essential to the story of being able to mix and
match runners and SDKs arbitrarily, and I think this is a great start.


On Fri, Jan 20, 2017 at 9:39 AM, Lukasz Cwik <lc...@google.com.invalid> wrote:
> Your correct, a docker container is created that contains the execution
> environment the user wants or the user re-uses an existing one (allowing
> for a user to embed all their code/dependencies or use a container that can
> deploy code/dependencies on demand).
> A user creates a pipeline saying which docker container they want to use
> (this starts to allow for multiple container definitions within a single
> pipeline to support multiple languages, versioning, ...).
> A runner would then be responsible for launching one or more of these
> containers in a cluster manager of their choice (scaling up or down the
> number of instances depending on demand/load/...).
> A runner then interacts with the docker containers over the gRPC service
> definitions to delegate processing to.
>
>
> On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
>> Hi Luke,
>>
>> that's really great and very promising !
>>
>> It's really ambitious but I like the idea. Just to clarify: the purpose of
>> using gRPC is once the docker container is running, then we can "interact"
>> with the container to spread and delegate processing to the docker
>> container, correct ?
>> The users/devops have to setup the docker containers as prerequisite.
>> Then, the "location" of the containers (kind of container registry) is set
>> via the pipeline options and used by gRPC ?
>>
>> Thanks Luke !
>>
>> Regards
>> JB
>>
>>
>> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
>>
>>> I have been prototyping several components towards the Beam technical
>>> vision of being able to execute an arbitrary language using an arbitrary
>>> runner.
>>>
>>> I would like to share this overview [1] of what I have been working
>>> towards. I also share this PR [2] with a proposed API, service definitions
>>> and partial implementation.
>>>
>>> 1: https://s.apache.org/beam-fn-api
>>> 2: https://github.com/apache/beam/pull/1801
>>>
>>> Please comment on the overview within this thread, and any specific code
>>> comments on the PR directly.
>>>
>>> Luke
>>>
>>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>

Re: Beam Fn API

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
Your correct, a docker container is created that contains the execution
environment the user wants or the user re-uses an existing one (allowing
for a user to embed all their code/dependencies or use a container that can
deploy code/dependencies on demand).
A user creates a pipeline saying which docker container they want to use
(this starts to allow for multiple container definitions within a single
pipeline to support multiple languages, versioning, ...).
A runner would then be responsible for launching one or more of these
containers in a cluster manager of their choice (scaling up or down the
number of instances depending on demand/load/...).
A runner then interacts with the docker containers over the gRPC service
definitions to delegate processing to.


On Fri, Jan 20, 2017 at 4:56 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Luke,
>
> that's really great and very promising !
>
> It's really ambitious but I like the idea. Just to clarify: the purpose of
> using gRPC is once the docker container is running, then we can "interact"
> with the container to spread and delegate processing to the docker
> container, correct ?
> The users/devops have to setup the docker containers as prerequisite.
> Then, the "location" of the containers (kind of container registry) is set
> via the pipeline options and used by gRPC ?
>
> Thanks Luke !
>
> Regards
> JB
>
>
> On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
>
>> I have been prototyping several components towards the Beam technical
>> vision of being able to execute an arbitrary language using an arbitrary
>> runner.
>>
>> I would like to share this overview [1] of what I have been working
>> towards. I also share this PR [2] with a proposed API, service definitions
>> and partial implementation.
>>
>> 1: https://s.apache.org/beam-fn-api
>> 2: https://github.com/apache/beam/pull/1801
>>
>> Please comment on the overview within this thread, and any specific code
>> comments on the PR directly.
>>
>> Luke
>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: Beam Fn API

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Luke,

that's really great and very promising !

It's really ambitious but I like the idea. Just to clarify: the purpose 
of using gRPC is once the docker container is running, then we can 
"interact" with the container to spread and delegate processing to the 
docker container, correct ?
The users/devops have to setup the docker containers as prerequisite. 
Then, the "location" of the containers (kind of container registry) is 
set via the pipeline options and used by gRPC ?

Thanks Luke !

Regards
JB

On 01/19/2017 03:56 PM, Lukasz Cwik wrote:
> I have been prototyping several components towards the Beam technical
> vision of being able to execute an arbitrary language using an arbitrary
> runner.
>
> I would like to share this overview [1] of what I have been working
> towards. I also share this PR [2] with a proposed API, service definitions
> and partial implementation.
>
> 1: https://s.apache.org/beam-fn-api
> 2: https://github.com/apache/beam/pull/1801
>
> Please comment on the overview within this thread, and any specific code
> comments on the PR directly.
>
> Luke
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Beam Fn API

Posted by Dan Halperin <dh...@google.com.INVALID>.
"relatively little extra work" once the base APIs are implemented.

On Thu, Jan 19, 2017 at 11:26 PM, Dan Halperin <dh...@google.com> wrote:

> This is an extremely ambitious part of the technical vision. I think it's
> a lot of work, but well worth it -- Python-SDK-on-Java-runner with
> relatively extra work? I don't care what the overhead is, this is making
> the impossible possible.
>
> On Thu, Jan 19, 2017 at 3:56 PM, Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
>> I have been prototyping several components towards the Beam technical
>> vision of being able to execute an arbitrary language using an arbitrary
>> runner.
>>
>> I would like to share this overview [1] of what I have been working
>> towards. I also share this PR [2] with a proposed API, service definitions
>> and partial implementation.
>>
>> 1: https://s.apache.org/beam-fn-api
>> 2: https://github.com/apache/beam/pull/1801
>>
>> Please comment on the overview within this thread, and any specific code
>> comments on the PR directly.
>>
>> Luke
>>
>
>

Re: Beam Fn API

Posted by Dan Halperin <dh...@google.com.INVALID>.
This is an extremely ambitious part of the technical vision. I think it's a
lot of work, but well worth it -- Python-SDK-on-Java-runner with relatively
extra work? I don't care what the overhead is, this is making the
impossible possible.

On Thu, Jan 19, 2017 at 3:56 PM, Lukasz Cwik <lc...@google.com.invalid>
wrote:

> I have been prototyping several components towards the Beam technical
> vision of being able to execute an arbitrary language using an arbitrary
> runner.
>
> I would like to share this overview [1] of what I have been working
> towards. I also share this PR [2] with a proposed API, service definitions
> and partial implementation.
>
> 1: https://s.apache.org/beam-fn-api
> 2: https://github.com/apache/beam/pull/1801
>
> Please comment on the overview within this thread, and any specific code
> comments on the PR directly.
>
> Luke
>