You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kenneth Knowles <kl...@google.com.INVALID> on 2016/06/16 19:12:57 UTC

[DISCUSS] Beam data plane serialization tech

Hello everyone!

We are busily working on a Runner API (for building and transmitting
pipelines)
and a Fn API (for invoking user-defined functions found within pipelines) as
outlined in the Beam technical vision [1]. Both of these require a
language-independent serialization technology for interoperability between
SDKs
and runners.

The Fn API includes a high-bandwidth data plane where bundles are
transmitted
via some serialization/RPC envelope (inside the envelope, the stream of
elements is encoded with a coder) to transfer bundles between the runner and
the SDK, so performance is extremely important. There are many choices for
high
performance serialization, and we would like to start the conversation about
what serialization technology is best for Beam.

The goal of this discussion is to arrive at consensus on the question: What
serialization technology should we use for the data plane envelope of the Fn
API?

To facilitate community discussion, we looked at the available technologies
and
tried to narrow the choices based on three criteria:

 - Performance: What is the size of serialized data? How do we expect the
   technology to affect pipeline speed and cost? etc

 - Language support: Does the technology support the most widespread
language
   for data processing? Does it have a vibrant ecosystem of contributed
   language bindings? etc

 - Community: What is the adoption of the technology? How mature is it? How
   active is development? How is the documentation? etc

Given these criteria, we came up with four technologies that are good
contenders. All have similar & adequate schema capabilities.

 - Apache Avro: Does not require code gen, but embedding the schema in the
data
   could be an issue. Very popular.

 - Apache Thrift: Probably a bit faster and compact than Avro. A huge
number of
   language supported.

 - Protocol Buffers 3: Incorporates the lessons that Google has learned
through
   long-term use of Protocol Buffers.

 - FlatBuffers: Some benchmarks imply great performance from the zero-copy
mmap
   idea. We would need to run representative experiments.

I want to emphasize that this is a community decision, and this thread is
just
the conversation starter for us all to weigh in. We just wanted to do some
legwork to focus the discussion if we could.

And there's a minor follow-up question: Once we settle here, is that
technology
also suitable for the low-bandwidth Runner API for defining pipelines, or
does
anyone think we need to consider a second technology (like JSON) for
usability
reasons?

[1]
https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38

Re: [DISCUSS] Beam data plane serialization tech

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
I'm still not 100% convinced, but I can see your argument better now.  I haven't used gRPC so I don't know how hard it would be to add auth there, but I know form experience that it is "I'd rather poke myself in the eye" painful with thrift and there are a lot of limitations when you do try to use it.  If gRPC can do it out of the box for all of the languages we want to support then yes lets go with it.  But if it is anything like thrift I personally would rather write my own muxer/demuxer over stdin/stdout.
 - Bobby 

    On Tuesday, June 28, 2016 10:34 AM, Lukasz Cwik <lc...@google.com> wrote:
 

 I think we will want an RPC server because the types of interactions between the runner and SDK aren't that simple:process some work (for some definition of work)split that sourceget progress of that sourceSDK requests a side inputSDK wants to read/write state...
As for security, I think that we could have a trusted auth mode (all communications are done in a trusted environment, e.g. localhost to localhost communication, private network) and also potentially support an OAuth2 client credentials grant being used as a bearer token.
The reason I'm suggesting this is because I believe we should have a docker container dedicated to the users code + SDK which uses the Fn API to communicate with the runner....
On Tue, Jun 28, 2016 at 7:17 AM, Bobby Evans <ev...@yahoo-inc.com.invalid> wrote:

What type of RPC do we really need?  Is this envisioned as a many to one connection situation?  What other processes exactly are going to be attaching to this RPC server and sending/receiving data?  If the non-java process opens up a port and listens there are all kinds of potential security concerns that will need to be addressed (encryption, authentication, authorization, etc.).  Using just stdin, stdout, and stderr is standard everywhere that java runs.  Has some decent security built in, and all we need is a protocol to wrap the payload in, which is what I thought we were talking about.
 I am fine with other better performing RPC methods, like shared memory.  Also if there is a reason for us to have a full blown RPC server I would have no problem with that, but I don't see a reason for it. What we are doing appears to just need single threaded point to point communication.

 - Bobby

    On Monday, June 27, 2016 9:24 AM, Aljoscha Krettek <al...@apache.org> wrote:


 Thanks Kenn for expanding on your previous mail. I now have a better idea
of why we need this.

Out of the systems you suggested Thrift and ProtoBuf3 + gRPC are probably
best suited for the task. Both of these provide a way for generating
serializers as well as for specifying an RPC interface. Avro and
FlatBuffers are only dealing in serializers and we would have to roll our
own RPC system on top of these. (It seems the gRPC folks have some work
going on about integrating support for FlatBuffers but not sure when this
is going to be done: https://github.com/grpc/grpc/issues/5438).

From the description and benchmarks FlatBuffers looks very nice, if it
really comes with a huge performance increase we might have to consider
using it with our own RPC instead of Thrift/gRPC+ProtoBuf3. This would mean
some overhead, however.

I would suggest to do some proof-of-concept implementations with both
Thrift and gPRC+ProtoBuf3 and see how it compares to the baseline (the
current implementation where we just directly call methods on the DoFn and
the DoFn calls methods on the outside directly.). This wouldn't have to
create a full stack, just enough to see how interaction with the DoFn would
work for the different systems.

On Wed, 22 Jun 2016 at 23:00 Kenneth Knowles <kl...@google.com.invalid> wrote:

> I wanted to say a bit more to clarify and enliven this discussion. My use
> of the term "data plane" may have been confusing. I didn't mean to focus it
> quite so much on the encoded elements. What I meant to discuss was the
> entirety of performance-sensitive interactions between the runner and
> user-defined functions. So let's drop the implied control/data distinction
> and just talk about the whole interface.
>
> At the risk of writing at length about something everyone knows... the
> motivation for the Fn API is this: we have a few types of user-definable
> functions (UDFs) that occur in pipelines, and we need to invoke them in a
> language-independent manner. These are DoFn, CombineFn, WindowFn,
> BoundedSource, UnboundedSource, ViewFn/PCollectionView, and Coder.
>
> I will show a bad idea: Take the interfaces of the above functions (minus
> Coder, which is special) and just turn them into RPC interfaces, and the
> SDK's job is just to be a trivial or near-trivial bridge from RPC to
> language-specific method calls. This is a bad proposal, but hopefully helps
> to show issues such as:
>
>  - How and when do we deserialize user code / launch a container? (my bad
> idea above doesn't answer; probably too often!)
>  - How and when do we encode/decode elements? (my bad idea above would
> require it between every UDF)
>  - How do we manage calls that are more than simply a stream of elements in
> a bundle? (example: side inputs)
>
> Any Fn API is required to have the same semantics as this simple proposal,
> but should achieve it with superior performance. I'll leave off the details
> since I am not authoring them personally. But let's assume as a baseline
> the approach of executing a fused stage of same-language UDFs in a row
> without any encoding/decoding or RPC, and making a single RPC call per
> bundle (ignoring amortized round trips for streaming bytes).
>
> I gather from this thread these questions (which I may be interpreting
> wrong; apologies if so) and I would like to answer them relative to this
> design sketch:
>
> Q: Since we have one RPC per bundle and it goes through the whole fused
> stage, and we have a whole stream of elements per call, doesn't the data
> dominate the envelope?
> A: In streaming executions, bundles can be very small, so the data will not
> necessarily dominate.
>
> Q: Do we really need structured messages? Perhaps byte streams with fairly
> trivial metadata suffice and we can just hand roll it?
> A: I think that schematized tech is well-proven for adaptability and it is
> also handy for code gen, regardless of performance. So to me the question
> is whether or not we need structured messages at all, or if we can model
> every high throughput communication as coder-encoded streams. I think that
> things like commits to state, acknowledgements of timer firings, pull-based
> requests like side inputs are probably best expressed via a schema. But
> maybe I am overlooking some design ideas.
>
> Q: How will side inputs arrive?
> A: This API is really designed to be pull-based, so it sort of implies a
> great many small RPCs (and caching).
>
> I'm sure I've left off some discussion points, and maybe oversimplified
> some things, but does this answer the questions somewhat? Does this clarify
> the suggested choices of tech? Do you still think we don't need them?
>
> Kenn
>
> On Mon, Jun 20, 2016 at 7:48 AM, Bobby Evans <ev...@yahoo-inc.com.invalid>
> wrote:
>
> > In storm we use JSON as the default communication between shell bolts and
> > shell spouts, which allows for APIs in non JVM languages. It works rather
> > well.  That being said it is also slow, and we made it a plugin so others
> > could make their own, faster, implementations.  For storm both the data
> and
> > the control are serialized to JSON, so I am not sure how much of that is
> > control and how much of it is the data that makes it slow.  I personally
> > would like to see a simple benchmark that implements the basic protocol
> > between the two so we can actually have a more numeric comparison.  As
> well
> > as any pain that someone experienced trying to implement even a proof of
> > concept.
> >
> > I agree with Amit too that long term we may want to think about
> supporting
> > structured data, and rely less on POJOs.  It allows for a lot of
> > optimizations in addition to having out of the box support for
> > serializing/de-serializing them in another language. But perhaps that is
> > more of a layer that sits on top of beam instead, because a lot of the
> > optimizations really make the most since in a declarative DSL like
> context.
> >
> >  - Bobby
> >
> >    On Saturday, June 18, 2016 6:56 AM, Amit Sela <am...@gmail.com>
> > wrote:
> >
> >
> >  My +1 for JSON was for the fact that it's common enough and simpler than
> > Protbuff/Avro/Thrift, and I would guess that (almost) all languages
> > acknowledge it, though I might be wrong here.
> >
> > As for KV & WindowedValue, I'm not sure what's the issue with Kryo, but
> the
> > "hardest" thing I had to do to get it working with Spark was to register
> > 3rd party implementations for Guava Immutable collections. And I honestly
> > don't know if there is one framework that covers everything in all
> (common)
> > languages.
> >
> > Finally, if I understand correctly, the suggestion is to transmit the
> data
> > as bytes with the appropriate coders, correct ? For the new Spark for
> > example, they use Encoders
> > <
> >
> https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
> > >
> > that have an internal schema and allows the engine to avoid
> > deserializations (and other optimizations) using this schema. So while
> the
> > current version of the Spark runner actually transforms objects into
> bytes
> > prior to shuffle, that might not be the best implementation for the next
> > generation of the runner...
> >
> > This is how I see things from my pretty modest experience with
> > serialization frameworks. Please correct me if/where I might be wrong.
> >
> > Thanks,
> > Amit
> >
> > On Fri, Jun 17, 2016 at 8:48 PM Lukasz Cwik <lc...@google.com.invalid>
> > wrote:
> >
> > > In the Runner API proposal doc, there are 10+ different types with
> > several
> > > fields each.
> > > Is it important to have a code generator for the schema?
> > > * simplify the SDK development process
> > > * reduce errors due to differences in custom implementation
> > >
> > > I'm not familiar with tool(s) which can take a JSON schema (e.g.
> > > http://json-schema.org/) and generate code in multiple languages.
> > Anyone?
> > >
> > >
> > > For the Data Plane API, a Runner and SDK must be able to encode
> elements
> > > such as WindowedValue and KVs in such a way that both sides can
> interpret
> > > them. For example, a Runner will be required to implement GBK so it
> must
> > be
> > > able to read the windowing information from the "bytes" transmitted,
> > > additionally it will need to be able to split KV<K, V> records apart
> and
> > > recreate KV<K, Iterable<V>> for the SDK. Since Coders are the dominant
> > way
> > > of encoding things, the Data Plane API will transmit "bytes" with the
> > > element boundaries encoded in some way. Aljoscha, I agree with you
> that a
> > > good choice for transmitting bytes between VMs/languages is very
> > important.
> > > Even though we are still transmitting mostly "bytes", error handling &
> > > connection handling are still important.
> > > For example, if we were to use gRPC and proto3 with a bidirectional
> > stream
> > > based API, we would get:
> > > the Runner and SDK can both push data both ways (stream from/to GBK,
> > stream
> > > from/to state)
> > > error handling
> > > code generation of client libraries
> > > HTTP/2
> > >
> > > As for the encoding, any SDK can choose any serialization it wants such
> > as
> > > Kryo but to get interoperability with other languages that would
> require
> > > others to implement parts of the Kryo serialization spec to be able to
> > > interpret the "bytes". Thus certain types like KV & WindowedValue
> should
> > be
> > > encoded in a way which allows for this interoperability.
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Jun 17, 2016 at 3:20 AM, Amit Sela <am...@gmail.com>
> wrote:
> > >
> > > > +1 on Aljoscha comment, not sure where's the benefit in having a
> > > > "schematic" serialization.
> > > >
> > > > I know that Spark and I think Flink as well, use Kryo
> > > > <https://github.com/EsotericSoftware/kryo> for serialization (to be
> > > > accurate it's Chill <https://github.com/twitter/chill> for Spark)
> and
> > I
> > > > found it very impressive even comparing to "manual" serializations,
> > > >  i.e., it seems to outperform Spark's "native" Encoders (1.6+) for
> > > > primitives..
> > > > In addition it clearly supports Java and Scala, and there are 3rd
> party
> > > > libraries for Clojure and Objective-C.
> > > >
> > > > I guess my bottom-line here agrees with Kenneth - performance and
> > > > interoperability - but I'm just not sure if schema based serializers
> > are
> > > > *always* the fastest.
> > > >
> > > > As for pipeline serialization, since performance is not the main
> issue,
> > > and
> > > > I think usability would be very important, I say +1 for JSON.
> > > >
> > > > For anyone who spent sometime on benchmarking serialization
> libraries,
> > > know
> > > > is the time to speak up ;)
> > > >
> > > > Thanks,
> > > > Amit
> > > >
> > > > On Fri, Jun 17, 2016 at 12:47 PM Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > am I correct in assuming that the transmitted envelopes would
> mostly
> > > > > contain coder-serialized values? If so, wouldn't the header of an
> > > > envelope
> > > > > just be the number of contained bytes and number of values? I'm
> > > probably
> > > > > missing something but with these assumptions I don't see the
> benefit
> > of
> > > > > using something like Avro/Thrift/Protobuf for serializing the
> > > main-input
> > > > > value envelopes. We would just need a system that can send byte
> data
> > > > really
> > > > > fast between languages/VMs.
> > > > >
> > > > > By the way, another interesting question (at least for me) is how
> > other
> > > > > data, such as side-inputs, is going to arrive at the DoFn if we
> want
> > to
> > > > > support a general interface for different languages.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Thu, 16 Jun 2016 at 22:33 Kenneth Knowles
> <klk@google.com.invalid
> > >
> > > > > wrote:
> > > > >
> > > > > > (Apologies for the formatting)
> > > > > >
> > > > > > On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles <
> klk@google.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hello everyone!
> > > > > > >
> > > > > > > We are busily working on a Runner API (for building and
> > > transmitting
> > > > > > > pipelines)
> > > > > > > and a Fn API (for invoking user-defined functions found within
> > > > > pipelines)
> > > > > > > as
> > > > > > > outlined in the Beam technical vision [1]. Both of these
> require
> > a
> > > > > > > language-independent serialization technology for
> > interoperability
> > > > > > between
> > > > > > > SDKs
> > > > > > > and runners.
> > > > > > >
> > > > > > > The Fn API includes a high-bandwidth data plane where bundles
> are
> > > > > > > transmitted
> > > > > > > via some serialization/RPC envelope (inside the envelope, the
> > > stream
> > > > of
> > > > > > > elements is encoded with a coder) to transfer bundles between
> the
> > > > > runner
> > > > > > > and
> > > > > > > the SDK, so performance is extremely important. There are many
> > > > choices
> > > > > > for
> > > > > > > high
> > > > > > > performance serialization, and we would like to start the
> > > > conversation
> > > > > > > about
> > > > > > > what serialization technology is best for Beam.
> > > > > > >
> > > > > > > The goal of this discussion is to arrive at consensus on the
> > > > question:
> > > > > > > What
> > > > > > > serialization technology should we use for the data plane
> > envelope
> > > of
> > > > > the
> > > > > > > Fn
> > > > > > > API?
> > > > > > >
> > > > > > > To facilitate community discussion, we looked at the available
> > > > > > > technologies and
> > > > > > > tried to narrow the choices based on three criteria:
> > > > > > >
> > > > > > >  - Performance: What is the size of serialized data? How do we
> > > expect
> > > > > the
> > > > > > >    technology to affect pipeline speed and cost? etc
> > > > > > >
> > > > > > >  - Language support: Does the technology support the most
> > > widespread
> > > > > > > language
> > > > > > >    for data processing? Does it have a vibrant ecosystem of
> > > > contributed
> > > > > > >    language bindings? etc
> > > > > > >
> > > > > > >  - Community: What is the adoption of the technology? How
> mature
> > is
> > > > it?
> > > > > > > How
> > > > > > >    active is development? How is the documentation? etc
> > > > > > >
> > > > > > > Given these criteria, we came up with four technologies that
> are
> > > good
> > > > > > > contenders. All have similar & adequate schema capabilities.
> > > > > > >
> > > > > > >  - Apache Avro: Does not require code gen, but embedding the
> > schema
> > > > in
> > > > > > the
> > > > > > > data
> > > > > > >    could be an issue. Very popular.
> > > > > > >
> > > > > > >  - Apache Thrift: Probably a bit faster and compact than Avro.
> A
> > > huge
> > > > > > > number of
> > > > > > >    language supported.
> > > > > > >
> > > > > > >  - Protocol Buffers 3: Incorporates the lessons that Google has
> > > > learned
> > > > > > > through
> > > > > > >    long-term use of Protocol Buffers.
> > > > > > >
> > > > > > >  - FlatBuffers: Some benchmarks imply great performance from
> the
> > > > > > zero-copy
> > > > > > > mmap
> > > > > > >    idea. We would need to run representative experiments.
> > > > > > >
> > > > > > > I want to emphasize that this is a community decision, and this
> > > > thread
> > > > > is
> > > > > > > just
> > > > > > > the conversation starter for us all to weigh in. We just wanted
> > to
> > > do
> > > > > > some
> > > > > > > legwork to focus the discussion if we could.
> > > > > > >
> > > > > > > And there's a minor follow-up question: Once we settle here, is
> > > that
> > > > > > > technology
> > > > > > > also suitable for the low-bandwidth Runner API for defining
> > > > pipelines,
> > > > > or
> > > > > > > does
> > > > > > > anyone think we need to consider a second technology (like
> JSON)
> > > for
> > > > > > > usability
> > > > > > > reasons?
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> >
>


  



  

Re: [DISCUSS] Beam data plane serialization tech

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
I think we will want an RPC server because the types of interactions
between the runner and SDK aren't that simple:
process some work (for some definition of work)
split that source
get progress of that source
SDK requests a side input
SDK wants to read/write state
...

As for security, I think that we could have a trusted auth mode (all
communications are done in a trusted environment, e.g. localhost to
localhost communication, private network) and also potentially support an
OAuth2 client credentials grant being used as a bearer token.

The reason I'm suggesting this is because I believe we should have a docker
container dedicated to the users code + SDK which uses the Fn API to
communicate with the runner.
...

On Tue, Jun 28, 2016 at 7:17 AM, Bobby Evans <ev...@yahoo-inc.com.invalid>
wrote:

> What type of RPC do we really need?  Is this envisioned as a many to one
> connection situation?  What other processes exactly are going to be
> attaching to this RPC server and sending/receiving data?  If the non-java
> process opens up a port and listens there are all kinds of potential
> security concerns that will need to be addressed (encryption,
> authentication, authorization, etc.).  Using just stdin, stdout, and stderr
> is standard everywhere that java runs.  Has some decent security built in,
> and all we need is a protocol to wrap the payload in, which is what I
> thought we were talking about.
>  I am fine with other better performing RPC methods, like shared memory.
> Also if there is a reason for us to have a full blown RPC server I would
> have no problem with that, but I don't see a reason for it. What we are
> doing appears to just need single threaded point to point communication.
>
>  - Bobby
>
>     On Monday, June 27, 2016 9:24 AM, Aljoscha Krettek <
> aljoscha@apache.org> wrote:
>
>
>  Thanks Kenn for expanding on your previous mail. I now have a better idea
> of why we need this.
>
> Out of the systems you suggested Thrift and ProtoBuf3 + gRPC are probably
> best suited for the task. Both of these provide a way for generating
> serializers as well as for specifying an RPC interface. Avro and
> FlatBuffers are only dealing in serializers and we would have to roll our
> own RPC system on top of these. (It seems the gRPC folks have some work
> going on about integrating support for FlatBuffers but not sure when this
> is going to be done: https://github.com/grpc/grpc/issues/5438).
>
> From the description and benchmarks FlatBuffers looks very nice, if it
> really comes with a huge performance increase we might have to consider
> using it with our own RPC instead of Thrift/gRPC+ProtoBuf3. This would mean
> some overhead, however.
>
> I would suggest to do some proof-of-concept implementations with both
> Thrift and gPRC+ProtoBuf3 and see how it compares to the baseline (the
> current implementation where we just directly call methods on the DoFn and
> the DoFn calls methods on the outside directly.). This wouldn't have to
> create a full stack, just enough to see how interaction with the DoFn would
> work for the different systems.
>
> On Wed, 22 Jun 2016 at 23:00 Kenneth Knowles <kl...@google.com.invalid>
> wrote:
>
> > I wanted to say a bit more to clarify and enliven this discussion. My use
> > of the term "data plane" may have been confusing. I didn't mean to focus
> it
> > quite so much on the encoded elements. What I meant to discuss was the
> > entirety of performance-sensitive interactions between the runner and
> > user-defined functions. So let's drop the implied control/data
> distinction
> > and just talk about the whole interface.
> >
> > At the risk of writing at length about something everyone knows... the
> > motivation for the Fn API is this: we have a few types of user-definable
> > functions (UDFs) that occur in pipelines, and we need to invoke them in a
> > language-independent manner. These are DoFn, CombineFn, WindowFn,
> > BoundedSource, UnboundedSource, ViewFn/PCollectionView, and Coder.
> >
> > I will show a bad idea: Take the interfaces of the above functions (minus
> > Coder, which is special) and just turn them into RPC interfaces, and the
> > SDK's job is just to be a trivial or near-trivial bridge from RPC to
> > language-specific method calls. This is a bad proposal, but hopefully
> helps
> > to show issues such as:
> >
> >  - How and when do we deserialize user code / launch a container? (my bad
> > idea above doesn't answer; probably too often!)
> >  - How and when do we encode/decode elements? (my bad idea above would
> > require it between every UDF)
> >  - How do we manage calls that are more than simply a stream of elements
> in
> > a bundle? (example: side inputs)
> >
> > Any Fn API is required to have the same semantics as this simple
> proposal,
> > but should achieve it with superior performance. I'll leave off the
> details
> > since I am not authoring them personally. But let's assume as a baseline
> > the approach of executing a fused stage of same-language UDFs in a row
> > without any encoding/decoding or RPC, and making a single RPC call per
> > bundle (ignoring amortized round trips for streaming bytes).
> >
> > I gather from this thread these questions (which I may be interpreting
> > wrong; apologies if so) and I would like to answer them relative to this
> > design sketch:
> >
> > Q: Since we have one RPC per bundle and it goes through the whole fused
> > stage, and we have a whole stream of elements per call, doesn't the data
> > dominate the envelope?
> > A: In streaming executions, bundles can be very small, so the data will
> not
> > necessarily dominate.
> >
> > Q: Do we really need structured messages? Perhaps byte streams with
> fairly
> > trivial metadata suffice and we can just hand roll it?
> > A: I think that schematized tech is well-proven for adaptability and it
> is
> > also handy for code gen, regardless of performance. So to me the question
> > is whether or not we need structured messages at all, or if we can model
> > every high throughput communication as coder-encoded streams. I think
> that
> > things like commits to state, acknowledgements of timer firings,
> pull-based
> > requests like side inputs are probably best expressed via a schema. But
> > maybe I am overlooking some design ideas.
> >
> > Q: How will side inputs arrive?
> > A: This API is really designed to be pull-based, so it sort of implies a
> > great many small RPCs (and caching).
> >
> > I'm sure I've left off some discussion points, and maybe oversimplified
> > some things, but does this answer the questions somewhat? Does this
> clarify
> > the suggested choices of tech? Do you still think we don't need them?
> >
> > Kenn
> >
> > On Mon, Jun 20, 2016 at 7:48 AM, Bobby Evans <evans@yahoo-inc.com.invalid
> >
> > wrote:
> >
> > > In storm we use JSON as the default communication between shell bolts
> and
> > > shell spouts, which allows for APIs in non JVM languages. It works
> rather
> > > well.  That being said it is also slow, and we made it a plugin so
> others
> > > could make their own, faster, implementations.  For storm both the data
> > and
> > > the control are serialized to JSON, so I am not sure how much of that
> is
> > > control and how much of it is the data that makes it slow.  I
> personally
> > > would like to see a simple benchmark that implements the basic protocol
> > > between the two so we can actually have a more numeric comparison.  As
> > well
> > > as any pain that someone experienced trying to implement even a proof
> of
> > > concept.
> > >
> > > I agree with Amit too that long term we may want to think about
> > supporting
> > > structured data, and rely less on POJOs.  It allows for a lot of
> > > optimizations in addition to having out of the box support for
> > > serializing/de-serializing them in another language. But perhaps that
> is
> > > more of a layer that sits on top of beam instead, because a lot of the
> > > optimizations really make the most since in a declarative DSL like
> > context.
> > >
> > >  - Bobby
> > >
> > >    On Saturday, June 18, 2016 6:56 AM, Amit Sela <amitsela33@gmail.com
> >
> > > wrote:
> > >
> > >
> > >  My +1 for JSON was for the fact that it's common enough and simpler
> than
> > > Protbuff/Avro/Thrift, and I would guess that (almost) all languages
> > > acknowledge it, though I might be wrong here.
> > >
> > > As for KV & WindowedValue, I'm not sure what's the issue with Kryo, but
> > the
> > > "hardest" thing I had to do to get it working with Spark was to
> register
> > > 3rd party implementations for Guava Immutable collections. And I
> honestly
> > > don't know if there is one framework that covers everything in all
> > (common)
> > > languages.
> > >
> > > Finally, if I understand correctly, the suggestion is to transmit the
> > data
> > > as bytes with the appropriate coders, correct ? For the new Spark for
> > > example, they use Encoders
> > > <
> > >
> >
> https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
> > > >
> > > that have an internal schema and allows the engine to avoid
> > > deserializations (and other optimizations) using this schema. So while
> > the
> > > current version of the Spark runner actually transforms objects into
> > bytes
> > > prior to shuffle, that might not be the best implementation for the
> next
> > > generation of the runner...
> > >
> > > This is how I see things from my pretty modest experience with
> > > serialization frameworks. Please correct me if/where I might be wrong.
> > >
> > > Thanks,
> > > Amit
> > >
> > > On Fri, Jun 17, 2016 at 8:48 PM Lukasz Cwik <lc...@google.com.invalid>
> > > wrote:
> > >
> > > > In the Runner API proposal doc, there are 10+ different types with
> > > several
> > > > fields each.
> > > > Is it important to have a code generator for the schema?
> > > > * simplify the SDK development process
> > > > * reduce errors due to differences in custom implementation
> > > >
> > > > I'm not familiar with tool(s) which can take a JSON schema (e.g.
> > > > http://json-schema.org/) and generate code in multiple languages.
> > > Anyone?
> > > >
> > > >
> > > > For the Data Plane API, a Runner and SDK must be able to encode
> > elements
> > > > such as WindowedValue and KVs in such a way that both sides can
> > interpret
> > > > them. For example, a Runner will be required to implement GBK so it
> > must
> > > be
> > > > able to read the windowing information from the "bytes" transmitted,
> > > > additionally it will need to be able to split KV<K, V> records apart
> > and
> > > > recreate KV<K, Iterable<V>> for the SDK. Since Coders are the
> dominant
> > > way
> > > > of encoding things, the Data Plane API will transmit "bytes" with the
> > > > element boundaries encoded in some way. Aljoscha, I agree with you
> > that a
> > > > good choice for transmitting bytes between VMs/languages is very
> > > important.
> > > > Even though we are still transmitting mostly "bytes", error handling
> &
> > > > connection handling are still important.
> > > > For example, if we were to use gRPC and proto3 with a bidirectional
> > > stream
> > > > based API, we would get:
> > > > the Runner and SDK can both push data both ways (stream from/to GBK,
> > > stream
> > > > from/to state)
> > > > error handling
> > > > code generation of client libraries
> > > > HTTP/2
> > > >
> > > > As for the encoding, any SDK can choose any serialization it wants
> such
> > > as
> > > > Kryo but to get interoperability with other languages that would
> > require
> > > > others to implement parts of the Kryo serialization spec to be able
> to
> > > > interpret the "bytes". Thus certain types like KV & WindowedValue
> > should
> > > be
> > > > encoded in a way which allows for this interoperability.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Jun 17, 2016 at 3:20 AM, Amit Sela <am...@gmail.com>
> > wrote:
> > > >
> > > > > +1 on Aljoscha comment, not sure where's the benefit in having a
> > > > > "schematic" serialization.
> > > > >
> > > > > I know that Spark and I think Flink as well, use Kryo
> > > > > <https://github.com/EsotericSoftware/kryo> for serialization (to
> be
> > > > > accurate it's Chill <https://github.com/twitter/chill> for Spark)
> > and
> > > I
> > > > > found it very impressive even comparing to "manual" serializations,
> > > > >  i.e., it seems to outperform Spark's "native" Encoders (1.6+) for
> > > > > primitives..
> > > > > In addition it clearly supports Java and Scala, and there are 3rd
> > party
> > > > > libraries for Clojure and Objective-C.
> > > > >
> > > > > I guess my bottom-line here agrees with Kenneth - performance and
> > > > > interoperability - but I'm just not sure if schema based
> serializers
> > > are
> > > > > *always* the fastest.
> > > > >
> > > > > As for pipeline serialization, since performance is not the main
> > issue,
> > > > and
> > > > > I think usability would be very important, I say +1 for JSON.
> > > > >
> > > > > For anyone who spent sometime on benchmarking serialization
> > libraries,
> > > > know
> > > > > is the time to speak up ;)
> > > > >
> > > > > Thanks,
> > > > > Amit
> > > > >
> > > > > On Fri, Jun 17, 2016 at 12:47 PM Aljoscha Krettek <
> > aljoscha@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > am I correct in assuming that the transmitted envelopes would
> > mostly
> > > > > > contain coder-serialized values? If so, wouldn't the header of an
> > > > > envelope
> > > > > > just be the number of contained bytes and number of values? I'm
> > > > probably
> > > > > > missing something but with these assumptions I don't see the
> > benefit
> > > of
> > > > > > using something like Avro/Thrift/Protobuf for serializing the
> > > > main-input
> > > > > > value envelopes. We would just need a system that can send byte
> > data
> > > > > really
> > > > > > fast between languages/VMs.
> > > > > >
> > > > > > By the way, another interesting question (at least for me) is how
> > > other
> > > > > > data, such as side-inputs, is going to arrive at the DoFn if we
> > want
> > > to
> > > > > > support a general interface for different languages.
> > > > > >
> > > > > > Cheers,
> > > > > > Aljoscha
> > > > > >
> > > > > > On Thu, 16 Jun 2016 at 22:33 Kenneth Knowles
> > <klk@google.com.invalid
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > (Apologies for the formatting)
> > > > > > >
> > > > > > > On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles <
> > klk@google.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello everyone!
> > > > > > > >
> > > > > > > > We are busily working on a Runner API (for building and
> > > > transmitting
> > > > > > > > pipelines)
> > > > > > > > and a Fn API (for invoking user-defined functions found
> within
> > > > > > pipelines)
> > > > > > > > as
> > > > > > > > outlined in the Beam technical vision [1]. Both of these
> > require
> > > a
> > > > > > > > language-independent serialization technology for
> > > interoperability
> > > > > > > between
> > > > > > > > SDKs
> > > > > > > > and runners.
> > > > > > > >
> > > > > > > > The Fn API includes a high-bandwidth data plane where bundles
> > are
> > > > > > > > transmitted
> > > > > > > > via some serialization/RPC envelope (inside the envelope, the
> > > > stream
> > > > > of
> > > > > > > > elements is encoded with a coder) to transfer bundles between
> > the
> > > > > > runner
> > > > > > > > and
> > > > > > > > the SDK, so performance is extremely important. There are
> many
> > > > > choices
> > > > > > > for
> > > > > > > > high
> > > > > > > > performance serialization, and we would like to start the
> > > > > conversation
> > > > > > > > about
> > > > > > > > what serialization technology is best for Beam.
> > > > > > > >
> > > > > > > > The goal of this discussion is to arrive at consensus on the
> > > > > question:
> > > > > > > > What
> > > > > > > > serialization technology should we use for the data plane
> > > envelope
> > > > of
> > > > > > the
> > > > > > > > Fn
> > > > > > > > API?
> > > > > > > >
> > > > > > > > To facilitate community discussion, we looked at the
> available
> > > > > > > > technologies and
> > > > > > > > tried to narrow the choices based on three criteria:
> > > > > > > >
> > > > > > > >  - Performance: What is the size of serialized data? How do
> we
> > > > expect
> > > > > > the
> > > > > > > >    technology to affect pipeline speed and cost? etc
> > > > > > > >
> > > > > > > >  - Language support: Does the technology support the most
> > > > widespread
> > > > > > > > language
> > > > > > > >    for data processing? Does it have a vibrant ecosystem of
> > > > > contributed
> > > > > > > >    language bindings? etc
> > > > > > > >
> > > > > > > >  - Community: What is the adoption of the technology? How
> > mature
> > > is
> > > > > it?
> > > > > > > > How
> > > > > > > >    active is development? How is the documentation? etc
> > > > > > > >
> > > > > > > > Given these criteria, we came up with four technologies that
> > are
> > > > good
> > > > > > > > contenders. All have similar & adequate schema capabilities.
> > > > > > > >
> > > > > > > >  - Apache Avro: Does not require code gen, but embedding the
> > > schema
> > > > > in
> > > > > > > the
> > > > > > > > data
> > > > > > > >    could be an issue. Very popular.
> > > > > > > >
> > > > > > > >  - Apache Thrift: Probably a bit faster and compact than
> Avro.
> > A
> > > > huge
> > > > > > > > number of
> > > > > > > >    language supported.
> > > > > > > >
> > > > > > > >  - Protocol Buffers 3: Incorporates the lessons that Google
> has
> > > > > learned
> > > > > > > > through
> > > > > > > >    long-term use of Protocol Buffers.
> > > > > > > >
> > > > > > > >  - FlatBuffers: Some benchmarks imply great performance from
> > the
> > > > > > > zero-copy
> > > > > > > > mmap
> > > > > > > >    idea. We would need to run representative experiments.
> > > > > > > >
> > > > > > > > I want to emphasize that this is a community decision, and
> this
> > > > > thread
> > > > > > is
> > > > > > > > just
> > > > > > > > the conversation starter for us all to weigh in. We just
> wanted
> > > to
> > > > do
> > > > > > > some
> > > > > > > > legwork to focus the discussion if we could.
> > > > > > > >
> > > > > > > > And there's a minor follow-up question: Once we settle here,
> is
> > > > that
> > > > > > > > technology
> > > > > > > > also suitable for the low-bandwidth Runner API for defining
> > > > > pipelines,
> > > > > > or
> > > > > > > > does
> > > > > > > > anyone think we need to consider a second technology (like
> > JSON)
> > > > for
> > > > > > > > usability
> > > > > > > > reasons?
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > >
> >
>
>
>
>

Re: [DISCUSS] Beam data plane serialization tech

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
What type of RPC do we really need?  Is this envisioned as a many to one connection situation?  What other processes exactly are going to be attaching to this RPC server and sending/receiving data?  If the non-java process opens up a port and listens there are all kinds of potential security concerns that will need to be addressed (encryption, authentication, authorization, etc.).  Using just stdin, stdout, and stderr is standard everywhere that java runs.  Has some decent security built in, and all we need is a protocol to wrap the payload in, which is what I thought we were talking about.
 I am fine with other better performing RPC methods, like shared memory.  Also if there is a reason for us to have a full blown RPC server I would have no problem with that, but I don't see a reason for it. What we are doing appears to just need single threaded point to point communication. 

 - Bobby 

    On Monday, June 27, 2016 9:24 AM, Aljoscha Krettek <al...@apache.org> wrote:
 

 Thanks Kenn for expanding on your previous mail. I now have a better idea
of why we need this.

Out of the systems you suggested Thrift and ProtoBuf3 + gRPC are probably
best suited for the task. Both of these provide a way for generating
serializers as well as for specifying an RPC interface. Avro and
FlatBuffers are only dealing in serializers and we would have to roll our
own RPC system on top of these. (It seems the gRPC folks have some work
going on about integrating support for FlatBuffers but not sure when this
is going to be done: https://github.com/grpc/grpc/issues/5438).

From the description and benchmarks FlatBuffers looks very nice, if it
really comes with a huge performance increase we might have to consider
using it with our own RPC instead of Thrift/gRPC+ProtoBuf3. This would mean
some overhead, however.

I would suggest to do some proof-of-concept implementations with both
Thrift and gPRC+ProtoBuf3 and see how it compares to the baseline (the
current implementation where we just directly call methods on the DoFn and
the DoFn calls methods on the outside directly.). This wouldn't have to
create a full stack, just enough to see how interaction with the DoFn would
work for the different systems.

On Wed, 22 Jun 2016 at 23:00 Kenneth Knowles <kl...@google.com.invalid> wrote:

> I wanted to say a bit more to clarify and enliven this discussion. My use
> of the term "data plane" may have been confusing. I didn't mean to focus it
> quite so much on the encoded elements. What I meant to discuss was the
> entirety of performance-sensitive interactions between the runner and
> user-defined functions. So let's drop the implied control/data distinction
> and just talk about the whole interface.
>
> At the risk of writing at length about something everyone knows... the
> motivation for the Fn API is this: we have a few types of user-definable
> functions (UDFs) that occur in pipelines, and we need to invoke them in a
> language-independent manner. These are DoFn, CombineFn, WindowFn,
> BoundedSource, UnboundedSource, ViewFn/PCollectionView, and Coder.
>
> I will show a bad idea: Take the interfaces of the above functions (minus
> Coder, which is special) and just turn them into RPC interfaces, and the
> SDK's job is just to be a trivial or near-trivial bridge from RPC to
> language-specific method calls. This is a bad proposal, but hopefully helps
> to show issues such as:
>
>  - How and when do we deserialize user code / launch a container? (my bad
> idea above doesn't answer; probably too often!)
>  - How and when do we encode/decode elements? (my bad idea above would
> require it between every UDF)
>  - How do we manage calls that are more than simply a stream of elements in
> a bundle? (example: side inputs)
>
> Any Fn API is required to have the same semantics as this simple proposal,
> but should achieve it with superior performance. I'll leave off the details
> since I am not authoring them personally. But let's assume as a baseline
> the approach of executing a fused stage of same-language UDFs in a row
> without any encoding/decoding or RPC, and making a single RPC call per
> bundle (ignoring amortized round trips for streaming bytes).
>
> I gather from this thread these questions (which I may be interpreting
> wrong; apologies if so) and I would like to answer them relative to this
> design sketch:
>
> Q: Since we have one RPC per bundle and it goes through the whole fused
> stage, and we have a whole stream of elements per call, doesn't the data
> dominate the envelope?
> A: In streaming executions, bundles can be very small, so the data will not
> necessarily dominate.
>
> Q: Do we really need structured messages? Perhaps byte streams with fairly
> trivial metadata suffice and we can just hand roll it?
> A: I think that schematized tech is well-proven for adaptability and it is
> also handy for code gen, regardless of performance. So to me the question
> is whether or not we need structured messages at all, or if we can model
> every high throughput communication as coder-encoded streams. I think that
> things like commits to state, acknowledgements of timer firings, pull-based
> requests like side inputs are probably best expressed via a schema. But
> maybe I am overlooking some design ideas.
>
> Q: How will side inputs arrive?
> A: This API is really designed to be pull-based, so it sort of implies a
> great many small RPCs (and caching).
>
> I'm sure I've left off some discussion points, and maybe oversimplified
> some things, but does this answer the questions somewhat? Does this clarify
> the suggested choices of tech? Do you still think we don't need them?
>
> Kenn
>
> On Mon, Jun 20, 2016 at 7:48 AM, Bobby Evans <ev...@yahoo-inc.com.invalid>
> wrote:
>
> > In storm we use JSON as the default communication between shell bolts and
> > shell spouts, which allows for APIs in non JVM languages. It works rather
> > well.  That being said it is also slow, and we made it a plugin so others
> > could make their own, faster, implementations.  For storm both the data
> and
> > the control are serialized to JSON, so I am not sure how much of that is
> > control and how much of it is the data that makes it slow.  I personally
> > would like to see a simple benchmark that implements the basic protocol
> > between the two so we can actually have a more numeric comparison.  As
> well
> > as any pain that someone experienced trying to implement even a proof of
> > concept.
> >
> > I agree with Amit too that long term we may want to think about
> supporting
> > structured data, and rely less on POJOs.  It allows for a lot of
> > optimizations in addition to having out of the box support for
> > serializing/de-serializing them in another language. But perhaps that is
> > more of a layer that sits on top of beam instead, because a lot of the
> > optimizations really make the most since in a declarative DSL like
> context.
> >
> >  - Bobby
> >
> >    On Saturday, June 18, 2016 6:56 AM, Amit Sela <am...@gmail.com>
> > wrote:
> >
> >
> >  My +1 for JSON was for the fact that it's common enough and simpler than
> > Protbuff/Avro/Thrift, and I would guess that (almost) all languages
> > acknowledge it, though I might be wrong here.
> >
> > As for KV & WindowedValue, I'm not sure what's the issue with Kryo, but
> the
> > "hardest" thing I had to do to get it working with Spark was to register
> > 3rd party implementations for Guava Immutable collections. And I honestly
> > don't know if there is one framework that covers everything in all
> (common)
> > languages.
> >
> > Finally, if I understand correctly, the suggestion is to transmit the
> data
> > as bytes with the appropriate coders, correct ? For the new Spark for
> > example, they use Encoders
> > <
> >
> https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
> > >
> > that have an internal schema and allows the engine to avoid
> > deserializations (and other optimizations) using this schema. So while
> the
> > current version of the Spark runner actually transforms objects into
> bytes
> > prior to shuffle, that might not be the best implementation for the next
> > generation of the runner...
> >
> > This is how I see things from my pretty modest experience with
> > serialization frameworks. Please correct me if/where I might be wrong.
> >
> > Thanks,
> > Amit
> >
> > On Fri, Jun 17, 2016 at 8:48 PM Lukasz Cwik <lc...@google.com.invalid>
> > wrote:
> >
> > > In the Runner API proposal doc, there are 10+ different types with
> > several
> > > fields each.
> > > Is it important to have a code generator for the schema?
> > > * simplify the SDK development process
> > > * reduce errors due to differences in custom implementation
> > >
> > > I'm not familiar with tool(s) which can take a JSON schema (e.g.
> > > http://json-schema.org/) and generate code in multiple languages.
> > Anyone?
> > >
> > >
> > > For the Data Plane API, a Runner and SDK must be able to encode
> elements
> > > such as WindowedValue and KVs in such a way that both sides can
> interpret
> > > them. For example, a Runner will be required to implement GBK so it
> must
> > be
> > > able to read the windowing information from the "bytes" transmitted,
> > > additionally it will need to be able to split KV<K, V> records apart
> and
> > > recreate KV<K, Iterable<V>> for the SDK. Since Coders are the dominant
> > way
> > > of encoding things, the Data Plane API will transmit "bytes" with the
> > > element boundaries encoded in some way. Aljoscha, I agree with you
> that a
> > > good choice for transmitting bytes between VMs/languages is very
> > important.
> > > Even though we are still transmitting mostly "bytes", error handling &
> > > connection handling are still important.
> > > For example, if we were to use gRPC and proto3 with a bidirectional
> > stream
> > > based API, we would get:
> > > the Runner and SDK can both push data both ways (stream from/to GBK,
> > stream
> > > from/to state)
> > > error handling
> > > code generation of client libraries
> > > HTTP/2
> > >
> > > As for the encoding, any SDK can choose any serialization it wants such
> > as
> > > Kryo but to get interoperability with other languages that would
> require
> > > others to implement parts of the Kryo serialization spec to be able to
> > > interpret the "bytes". Thus certain types like KV & WindowedValue
> should
> > be
> > > encoded in a way which allows for this interoperability.
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Jun 17, 2016 at 3:20 AM, Amit Sela <am...@gmail.com>
> wrote:
> > >
> > > > +1 on Aljoscha comment, not sure where's the benefit in having a
> > > > "schematic" serialization.
> > > >
> > > > I know that Spark and I think Flink as well, use Kryo
> > > > <https://github.com/EsotericSoftware/kryo> for serialization (to be
> > > > accurate it's Chill <https://github.com/twitter/chill> for Spark)
> and
> > I
> > > > found it very impressive even comparing to "manual" serializations,
> > > >  i.e., it seems to outperform Spark's "native" Encoders (1.6+) for
> > > > primitives..
> > > > In addition it clearly supports Java and Scala, and there are 3rd
> party
> > > > libraries for Clojure and Objective-C.
> > > >
> > > > I guess my bottom-line here agrees with Kenneth - performance and
> > > > interoperability - but I'm just not sure if schema based serializers
> > are
> > > > *always* the fastest.
> > > >
> > > > As for pipeline serialization, since performance is not the main
> issue,
> > > and
> > > > I think usability would be very important, I say +1 for JSON.
> > > >
> > > > For anyone who spent sometime on benchmarking serialization
> libraries,
> > > know
> > > > is the time to speak up ;)
> > > >
> > > > Thanks,
> > > > Amit
> > > >
> > > > On Fri, Jun 17, 2016 at 12:47 PM Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > am I correct in assuming that the transmitted envelopes would
> mostly
> > > > > contain coder-serialized values? If so, wouldn't the header of an
> > > > envelope
> > > > > just be the number of contained bytes and number of values? I'm
> > > probably
> > > > > missing something but with these assumptions I don't see the
> benefit
> > of
> > > > > using something like Avro/Thrift/Protobuf for serializing the
> > > main-input
> > > > > value envelopes. We would just need a system that can send byte
> data
> > > > really
> > > > > fast between languages/VMs.
> > > > >
> > > > > By the way, another interesting question (at least for me) is how
> > other
> > > > > data, such as side-inputs, is going to arrive at the DoFn if we
> want
> > to
> > > > > support a general interface for different languages.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Thu, 16 Jun 2016 at 22:33 Kenneth Knowles
> <klk@google.com.invalid
> > >
> > > > > wrote:
> > > > >
> > > > > > (Apologies for the formatting)
> > > > > >
> > > > > > On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles <
> klk@google.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hello everyone!
> > > > > > >
> > > > > > > We are busily working on a Runner API (for building and
> > > transmitting
> > > > > > > pipelines)
> > > > > > > and a Fn API (for invoking user-defined functions found within
> > > > > pipelines)
> > > > > > > as
> > > > > > > outlined in the Beam technical vision [1]. Both of these
> require
> > a
> > > > > > > language-independent serialization technology for
> > interoperability
> > > > > > between
> > > > > > > SDKs
> > > > > > > and runners.
> > > > > > >
> > > > > > > The Fn API includes a high-bandwidth data plane where bundles
> are
> > > > > > > transmitted
> > > > > > > via some serialization/RPC envelope (inside the envelope, the
> > > stream
> > > > of
> > > > > > > elements is encoded with a coder) to transfer bundles between
> the
> > > > > runner
> > > > > > > and
> > > > > > > the SDK, so performance is extremely important. There are many
> > > > choices
> > > > > > for
> > > > > > > high
> > > > > > > performance serialization, and we would like to start the
> > > > conversation
> > > > > > > about
> > > > > > > what serialization technology is best for Beam.
> > > > > > >
> > > > > > > The goal of this discussion is to arrive at consensus on the
> > > > question:
> > > > > > > What
> > > > > > > serialization technology should we use for the data plane
> > envelope
> > > of
> > > > > the
> > > > > > > Fn
> > > > > > > API?
> > > > > > >
> > > > > > > To facilitate community discussion, we looked at the available
> > > > > > > technologies and
> > > > > > > tried to narrow the choices based on three criteria:
> > > > > > >
> > > > > > >  - Performance: What is the size of serialized data? How do we
> > > expect
> > > > > the
> > > > > > >    technology to affect pipeline speed and cost? etc
> > > > > > >
> > > > > > >  - Language support: Does the technology support the most
> > > widespread
> > > > > > > language
> > > > > > >    for data processing? Does it have a vibrant ecosystem of
> > > > contributed
> > > > > > >    language bindings? etc
> > > > > > >
> > > > > > >  - Community: What is the adoption of the technology? How
> mature
> > is
> > > > it?
> > > > > > > How
> > > > > > >    active is development? How is the documentation? etc
> > > > > > >
> > > > > > > Given these criteria, we came up with four technologies that
> are
> > > good
> > > > > > > contenders. All have similar & adequate schema capabilities.
> > > > > > >
> > > > > > >  - Apache Avro: Does not require code gen, but embedding the
> > schema
> > > > in
> > > > > > the
> > > > > > > data
> > > > > > >    could be an issue. Very popular.
> > > > > > >
> > > > > > >  - Apache Thrift: Probably a bit faster and compact than Avro.
> A
> > > huge
> > > > > > > number of
> > > > > > >    language supported.
> > > > > > >
> > > > > > >  - Protocol Buffers 3: Incorporates the lessons that Google has
> > > > learned
> > > > > > > through
> > > > > > >    long-term use of Protocol Buffers.
> > > > > > >
> > > > > > >  - FlatBuffers: Some benchmarks imply great performance from
> the
> > > > > > zero-copy
> > > > > > > mmap
> > > > > > >    idea. We would need to run representative experiments.
> > > > > > >
> > > > > > > I want to emphasize that this is a community decision, and this
> > > > thread
> > > > > is
> > > > > > > just
> > > > > > > the conversation starter for us all to weigh in. We just wanted
> > to
> > > do
> > > > > > some
> > > > > > > legwork to focus the discussion if we could.
> > > > > > >
> > > > > > > And there's a minor follow-up question: Once we settle here, is
> > > that
> > > > > > > technology
> > > > > > > also suitable for the low-bandwidth Runner API for defining
> > > > pipelines,
> > > > > or
> > > > > > > does
> > > > > > > anyone think we need to consider a second technology (like
> JSON)
> > > for
> > > > > > > usability
> > > > > > > reasons?
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> >
>


  

Re: [DISCUSS] Beam data plane serialization tech

Posted by vikas rk <vi...@gmail.com>.
+1.

You mean sharing definitions between the fn API and runner API? Like the
idea.

-Vikas

On 7 February 2017 at 14:39, Kenneth Knowles <kl...@google.com.invalid> wrote:

> This has lain dormant as I was drawn off to other things. But now I'm
> looping back on this so there are no surprises in my upcoming (third)
> revision to PR #662 [1] to use protocol buffers instead of JSON schema or
> Avro (the two prior versions - now I know what the runner API looks like in
> every format :-)).
>
> Here's the reasoning:
>
> 1. Since the Fn API requires the SDK harness to have protocol buffers
> support, there is no portability to be gained by having a proto-independent
> JSON schema or Avro schema for the Runner API. As currently designed, a
> language will need proto support in order to implement a Beam SDK.
>
> 2. Since proto has a JSON format that can be used for human readability,
> there's not really a usability benefit to using JSON schema and some other
> form of JSON.
>
> 3. Generation of helper libraries for proto is nice versus having a json
> schema, where support for generating POJOs, etc, might be incomplete or
> strange for some languages.
>
> 4. Some of the core generic "graph with stuff on the nodes and edges"
> definitions can be shared.
>
> If I've overlooked something, I'd love to hear about it.
>
> Kenn
>
> [1] https://github.com/apache/beam/pull/662
>
> On Fri, Jul 15, 2016 at 8:24 AM, Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> > Just to give people an update, I'm still working on collecting data.
> >
> > On Wed, Jun 29, 2016 at 10:47 AM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > My bad, I didn't know that. Thanks for the clarification!
> > >
> > > On Wed, 29 Jun 2016 at 16:38 Daniel Kulp <dk...@apache.org> wrote:
> > >
> > > >
> > > > > On Jun 27, 2016, at 10:24 AM, Aljoscha Krettek <
> aljoscha@apache.org>
> > > > wrote:
> > > > >
> > > > > Out of the systems you suggested Thrift and ProtoBuf3 + gRPC are
> > > probably
> > > > > best suited for the task. Both of these provide a way for
> generating
> > > > > serializers as well as for specifying an RPC interface. Avro and
> > > > > FlatBuffers are only dealing in serializers and we would have to
> roll
> > > our
> > > > > own RPC system on top of these.
> > > >
> > > >
> > > > Just a point of clarification, Avro does handle RPC as well as
> > > > serialization.   It's one of the main bullets on their overview page:
> > > >
> > > > http://avro.apache.org/docs/current/index.html
> > > >
> > > > Unfortunately, their documentation around the subject really sucks.
> > Some
> > > > info at:
> > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/AVRO/Porting+
> > Existing+RPC+Frameworks
> > > >
> > > > and a “quick start”:
> > > >
> > > > https://github.com/phunt/avro-rpc-quickstart
> > > >
> > > >
> > > >
> > > > --
> > > > Daniel Kulp
> > > > dkulp@apache.org - http://dankulp.com/blog
> > > > Talend Community Coder - http://coders.talend.com
> > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Beam data plane serialization tech

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
This has lain dormant as I was drawn off to other things. But now I'm
looping back on this so there are no surprises in my upcoming (third)
revision to PR #662 [1] to use protocol buffers instead of JSON schema or
Avro (the two prior versions - now I know what the runner API looks like in
every format :-)).

Here's the reasoning:

1. Since the Fn API requires the SDK harness to have protocol buffers
support, there is no portability to be gained by having a proto-independent
JSON schema or Avro schema for the Runner API. As currently designed, a
language will need proto support in order to implement a Beam SDK.

2. Since proto has a JSON format that can be used for human readability,
there's not really a usability benefit to using JSON schema and some other
form of JSON.

3. Generation of helper libraries for proto is nice versus having a json
schema, where support for generating POJOs, etc, might be incomplete or
strange for some languages.

4. Some of the core generic "graph with stuff on the nodes and edges"
definitions can be shared.

If I've overlooked something, I'd love to hear about it.

Kenn

[1] https://github.com/apache/beam/pull/662

On Fri, Jul 15, 2016 at 8:24 AM, Lukasz Cwik <lc...@google.com.invalid>
wrote:

> Just to give people an update, I'm still working on collecting data.
>
> On Wed, Jun 29, 2016 at 10:47 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > My bad, I didn't know that. Thanks for the clarification!
> >
> > On Wed, 29 Jun 2016 at 16:38 Daniel Kulp <dk...@apache.org> wrote:
> >
> > >
> > > > On Jun 27, 2016, at 10:24 AM, Aljoscha Krettek <al...@apache.org>
> > > wrote:
> > > >
> > > > Out of the systems you suggested Thrift and ProtoBuf3 + gRPC are
> > probably
> > > > best suited for the task. Both of these provide a way for generating
> > > > serializers as well as for specifying an RPC interface. Avro and
> > > > FlatBuffers are only dealing in serializers and we would have to roll
> > our
> > > > own RPC system on top of these.
> > >
> > >
> > > Just a point of clarification, Avro does handle RPC as well as
> > > serialization.   It's one of the main bullets on their overview page:
> > >
> > > http://avro.apache.org/docs/current/index.html
> > >
> > > Unfortunately, their documentation around the subject really sucks.
> Some
> > > info at:
> > >
> > >
> > https://cwiki.apache.org/confluence/display/AVRO/Porting+
> Existing+RPC+Frameworks
> > >
> > > and a “quick start”:
> > >
> > > https://github.com/phunt/avro-rpc-quickstart
> > >
> > >
> > >
> > > --
> > > Daniel Kulp
> > > dkulp@apache.org - http://dankulp.com/blog
> > > Talend Community Coder - http://coders.talend.com
> > >
> > >
> >
>

Re: [DISCUSS] Beam data plane serialization tech

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
Just to give people an update, I'm still working on collecting data.

On Wed, Jun 29, 2016 at 10:47 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> My bad, I didn't know that. Thanks for the clarification!
>
> On Wed, 29 Jun 2016 at 16:38 Daniel Kulp <dk...@apache.org> wrote:
>
> >
> > > On Jun 27, 2016, at 10:24 AM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> > >
> > > Out of the systems you suggested Thrift and ProtoBuf3 + gRPC are
> probably
> > > best suited for the task. Both of these provide a way for generating
> > > serializers as well as for specifying an RPC interface. Avro and
> > > FlatBuffers are only dealing in serializers and we would have to roll
> our
> > > own RPC system on top of these.
> >
> >
> > Just a point of clarification, Avro does handle RPC as well as
> > serialization.   It's one of the main bullets on their overview page:
> >
> > http://avro.apache.org/docs/current/index.html
> >
> > Unfortunately, their documentation around the subject really sucks.  Some
> > info at:
> >
> >
> https://cwiki.apache.org/confluence/display/AVRO/Porting+Existing+RPC+Frameworks
> >
> > and a “quick start”:
> >
> > https://github.com/phunt/avro-rpc-quickstart
> >
> >
> >
> > --
> > Daniel Kulp
> > dkulp@apache.org - http://dankulp.com/blog
> > Talend Community Coder - http://coders.talend.com
> >
> >
>

Re: [DISCUSS] Beam data plane serialization tech

Posted by Aljoscha Krettek <al...@apache.org>.
My bad, I didn't know that. Thanks for the clarification!

On Wed, 29 Jun 2016 at 16:38 Daniel Kulp <dk...@apache.org> wrote:

>
> > On Jun 27, 2016, at 10:24 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
> >
> > Out of the systems you suggested Thrift and ProtoBuf3 + gRPC are probably
> > best suited for the task. Both of these provide a way for generating
> > serializers as well as for specifying an RPC interface. Avro and
> > FlatBuffers are only dealing in serializers and we would have to roll our
> > own RPC system on top of these.
>
>
> Just a point of clarification, Avro does handle RPC as well as
> serialization.   It's one of the main bullets on their overview page:
>
> http://avro.apache.org/docs/current/index.html
>
> Unfortunately, their documentation around the subject really sucks.  Some
> info at:
>
> https://cwiki.apache.org/confluence/display/AVRO/Porting+Existing+RPC+Frameworks
>
> and a “quick start”:
>
> https://github.com/phunt/avro-rpc-quickstart
>
>
>
> --
> Daniel Kulp
> dkulp@apache.org - http://dankulp.com/blog
> Talend Community Coder - http://coders.talend.com
>
>

Re: [DISCUSS] Beam data plane serialization tech

Posted by Daniel Kulp <dk...@apache.org>.
> On Jun 27, 2016, at 10:24 AM, Aljoscha Krettek <al...@apache.org> wrote:
> 
> Out of the systems you suggested Thrift and ProtoBuf3 + gRPC are probably
> best suited for the task. Both of these provide a way for generating
> serializers as well as for specifying an RPC interface. Avro and
> FlatBuffers are only dealing in serializers and we would have to roll our
> own RPC system on top of these.


Just a point of clarification, Avro does handle RPC as well as serialization.   It's one of the main bullets on their overview page:

http://avro.apache.org/docs/current/index.html

Unfortunately, their documentation around the subject really sucks.  Some info at:
https://cwiki.apache.org/confluence/display/AVRO/Porting+Existing+RPC+Frameworks

and a “quick start”:

https://github.com/phunt/avro-rpc-quickstart



-- 
Daniel Kulp
dkulp@apache.org - http://dankulp.com/blog
Talend Community Coder - http://coders.talend.com


Re: [DISCUSS] Beam data plane serialization tech

Posted by Aljoscha Krettek <al...@apache.org>.
Thanks Kenn for expanding on your previous mail. I now have a better idea
of why we need this.

Out of the systems you suggested Thrift and ProtoBuf3 + gRPC are probably
best suited for the task. Both of these provide a way for generating
serializers as well as for specifying an RPC interface. Avro and
FlatBuffers are only dealing in serializers and we would have to roll our
own RPC system on top of these. (It seems the gRPC folks have some work
going on about integrating support for FlatBuffers but not sure when this
is going to be done: https://github.com/grpc/grpc/issues/5438).

From the description and benchmarks FlatBuffers looks very nice, if it
really comes with a huge performance increase we might have to consider
using it with our own RPC instead of Thrift/gRPC+ProtoBuf3. This would mean
some overhead, however.

I would suggest to do some proof-of-concept implementations with both
Thrift and gPRC+ProtoBuf3 and see how it compares to the baseline (the
current implementation where we just directly call methods on the DoFn and
the DoFn calls methods on the outside directly.). This wouldn't have to
create a full stack, just enough to see how interaction with the DoFn would
work for the different systems.

On Wed, 22 Jun 2016 at 23:00 Kenneth Knowles <kl...@google.com.invalid> wrote:

> I wanted to say a bit more to clarify and enliven this discussion. My use
> of the term "data plane" may have been confusing. I didn't mean to focus it
> quite so much on the encoded elements. What I meant to discuss was the
> entirety of performance-sensitive interactions between the runner and
> user-defined functions. So let's drop the implied control/data distinction
> and just talk about the whole interface.
>
> At the risk of writing at length about something everyone knows... the
> motivation for the Fn API is this: we have a few types of user-definable
> functions (UDFs) that occur in pipelines, and we need to invoke them in a
> language-independent manner. These are DoFn, CombineFn, WindowFn,
> BoundedSource, UnboundedSource, ViewFn/PCollectionView, and Coder.
>
> I will show a bad idea: Take the interfaces of the above functions (minus
> Coder, which is special) and just turn them into RPC interfaces, and the
> SDK's job is just to be a trivial or near-trivial bridge from RPC to
> language-specific method calls. This is a bad proposal, but hopefully helps
> to show issues such as:
>
>  - How and when do we deserialize user code / launch a container? (my bad
> idea above doesn't answer; probably too often!)
>  - How and when do we encode/decode elements? (my bad idea above would
> require it between every UDF)
>  - How do we manage calls that are more than simply a stream of elements in
> a bundle? (example: side inputs)
>
> Any Fn API is required to have the same semantics as this simple proposal,
> but should achieve it with superior performance. I'll leave off the details
> since I am not authoring them personally. But let's assume as a baseline
> the approach of executing a fused stage of same-language UDFs in a row
> without any encoding/decoding or RPC, and making a single RPC call per
> bundle (ignoring amortized round trips for streaming bytes).
>
> I gather from this thread these questions (which I may be interpreting
> wrong; apologies if so) and I would like to answer them relative to this
> design sketch:
>
> Q: Since we have one RPC per bundle and it goes through the whole fused
> stage, and we have a whole stream of elements per call, doesn't the data
> dominate the envelope?
> A: In streaming executions, bundles can be very small, so the data will not
> necessarily dominate.
>
> Q: Do we really need structured messages? Perhaps byte streams with fairly
> trivial metadata suffice and we can just hand roll it?
> A: I think that schematized tech is well-proven for adaptability and it is
> also handy for code gen, regardless of performance. So to me the question
> is whether or not we need structured messages at all, or if we can model
> every high throughput communication as coder-encoded streams. I think that
> things like commits to state, acknowledgements of timer firings, pull-based
> requests like side inputs are probably best expressed via a schema. But
> maybe I am overlooking some design ideas.
>
> Q: How will side inputs arrive?
> A: This API is really designed to be pull-based, so it sort of implies a
> great many small RPCs (and caching).
>
> I'm sure I've left off some discussion points, and maybe oversimplified
> some things, but does this answer the questions somewhat? Does this clarify
> the suggested choices of tech? Do you still think we don't need them?
>
> Kenn
>
> On Mon, Jun 20, 2016 at 7:48 AM, Bobby Evans <ev...@yahoo-inc.com.invalid>
> wrote:
>
> > In storm we use JSON as the default communication between shell bolts and
> > shell spouts, which allows for APIs in non JVM languages. It works rather
> > well.  That being said it is also slow, and we made it a plugin so others
> > could make their own, faster, implementations.  For storm both the data
> and
> > the control are serialized to JSON, so I am not sure how much of that is
> > control and how much of it is the data that makes it slow.  I personally
> > would like to see a simple benchmark that implements the basic protocol
> > between the two so we can actually have a more numeric comparison.  As
> well
> > as any pain that someone experienced trying to implement even a proof of
> > concept.
> >
> > I agree with Amit too that long term we may want to think about
> supporting
> > structured data, and rely less on POJOs.  It allows for a lot of
> > optimizations in addition to having out of the box support for
> > serializing/de-serializing them in another language. But perhaps that is
> > more of a layer that sits on top of beam instead, because a lot of the
> > optimizations really make the most since in a declarative DSL like
> context.
> >
> >  - Bobby
> >
> >     On Saturday, June 18, 2016 6:56 AM, Amit Sela <am...@gmail.com>
> > wrote:
> >
> >
> >  My +1 for JSON was for the fact that it's common enough and simpler than
> > Protbuff/Avro/Thrift, and I would guess that (almost) all languages
> > acknowledge it, though I might be wrong here.
> >
> > As for KV & WindowedValue, I'm not sure what's the issue with Kryo, but
> the
> > "hardest" thing I had to do to get it working with Spark was to register
> > 3rd party implementations for Guava Immutable collections. And I honestly
> > don't know if there is one framework that covers everything in all
> (common)
> > languages.
> >
> > Finally, if I understand correctly, the suggestion is to transmit the
> data
> > as bytes with the appropriate coders, correct ? For the new Spark for
> > example, they use Encoders
> > <
> >
> https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
> > >
> > that have an internal schema and allows the engine to avoid
> > deserializations (and other optimizations) using this schema. So while
> the
> > current version of the Spark runner actually transforms objects into
> bytes
> > prior to shuffle, that might not be the best implementation for the next
> > generation of the runner...
> >
> > This is how I see things from my pretty modest experience with
> > serialization frameworks. Please correct me if/where I might be wrong.
> >
> > Thanks,
> > Amit
> >
> > On Fri, Jun 17, 2016 at 8:48 PM Lukasz Cwik <lc...@google.com.invalid>
> > wrote:
> >
> > > In the Runner API proposal doc, there are 10+ different types with
> > several
> > > fields each.
> > > Is it important to have a code generator for the schema?
> > > * simplify the SDK development process
> > > * reduce errors due to differences in custom implementation
> > >
> > > I'm not familiar with tool(s) which can take a JSON schema (e.g.
> > > http://json-schema.org/) and generate code in multiple languages.
> > Anyone?
> > >
> > >
> > > For the Data Plane API, a Runner and SDK must be able to encode
> elements
> > > such as WindowedValue and KVs in such a way that both sides can
> interpret
> > > them. For example, a Runner will be required to implement GBK so it
> must
> > be
> > > able to read the windowing information from the "bytes" transmitted,
> > > additionally it will need to be able to split KV<K, V> records apart
> and
> > > recreate KV<K, Iterable<V>> for the SDK. Since Coders are the dominant
> > way
> > > of encoding things, the Data Plane API will transmit "bytes" with the
> > > element boundaries encoded in some way. Aljoscha, I agree with you
> that a
> > > good choice for transmitting bytes between VMs/languages is very
> > important.
> > > Even though we are still transmitting mostly "bytes", error handling &
> > > connection handling are still important.
> > > For example, if we were to use gRPC and proto3 with a bidirectional
> > stream
> > > based API, we would get:
> > > the Runner and SDK can both push data both ways (stream from/to GBK,
> > stream
> > > from/to state)
> > > error handling
> > > code generation of client libraries
> > > HTTP/2
> > >
> > > As for the encoding, any SDK can choose any serialization it wants such
> > as
> > > Kryo but to get interoperability with other languages that would
> require
> > > others to implement parts of the Kryo serialization spec to be able to
> > > interpret the "bytes". Thus certain types like KV & WindowedValue
> should
> > be
> > > encoded in a way which allows for this interoperability.
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Jun 17, 2016 at 3:20 AM, Amit Sela <am...@gmail.com>
> wrote:
> > >
> > > > +1 on Aljoscha comment, not sure where's the benefit in having a
> > > > "schematic" serialization.
> > > >
> > > > I know that Spark and I think Flink as well, use Kryo
> > > > <https://github.com/EsotericSoftware/kryo> for serialization (to be
> > > > accurate it's Chill <https://github.com/twitter/chill> for Spark)
> and
> > I
> > > > found it very impressive even comparing to "manual" serializations,
> > > >  i.e., it seems to outperform Spark's "native" Encoders (1.6+) for
> > > > primitives..
> > > > In addition it clearly supports Java and Scala, and there are 3rd
> party
> > > > libraries for Clojure and Objective-C.
> > > >
> > > > I guess my bottom-line here agrees with Kenneth - performance and
> > > > interoperability - but I'm just not sure if schema based serializers
> > are
> > > > *always* the fastest.
> > > >
> > > > As for pipeline serialization, since performance is not the main
> issue,
> > > and
> > > > I think usability would be very important, I say +1 for JSON.
> > > >
> > > > For anyone who spent sometime on benchmarking serialization
> libraries,
> > > know
> > > > is the time to speak up ;)
> > > >
> > > > Thanks,
> > > > Amit
> > > >
> > > > On Fri, Jun 17, 2016 at 12:47 PM Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > am I correct in assuming that the transmitted envelopes would
> mostly
> > > > > contain coder-serialized values? If so, wouldn't the header of an
> > > > envelope
> > > > > just be the number of contained bytes and number of values? I'm
> > > probably
> > > > > missing something but with these assumptions I don't see the
> benefit
> > of
> > > > > using something like Avro/Thrift/Protobuf for serializing the
> > > main-input
> > > > > value envelopes. We would just need a system that can send byte
> data
> > > > really
> > > > > fast between languages/VMs.
> > > > >
> > > > > By the way, another interesting question (at least for me) is how
> > other
> > > > > data, such as side-inputs, is going to arrive at the DoFn if we
> want
> > to
> > > > > support a general interface for different languages.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Thu, 16 Jun 2016 at 22:33 Kenneth Knowles
> <klk@google.com.invalid
> > >
> > > > > wrote:
> > > > >
> > > > > > (Apologies for the formatting)
> > > > > >
> > > > > > On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles <
> klk@google.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hello everyone!
> > > > > > >
> > > > > > > We are busily working on a Runner API (for building and
> > > transmitting
> > > > > > > pipelines)
> > > > > > > and a Fn API (for invoking user-defined functions found within
> > > > > pipelines)
> > > > > > > as
> > > > > > > outlined in the Beam technical vision [1]. Both of these
> require
> > a
> > > > > > > language-independent serialization technology for
> > interoperability
> > > > > > between
> > > > > > > SDKs
> > > > > > > and runners.
> > > > > > >
> > > > > > > The Fn API includes a high-bandwidth data plane where bundles
> are
> > > > > > > transmitted
> > > > > > > via some serialization/RPC envelope (inside the envelope, the
> > > stream
> > > > of
> > > > > > > elements is encoded with a coder) to transfer bundles between
> the
> > > > > runner
> > > > > > > and
> > > > > > > the SDK, so performance is extremely important. There are many
> > > > choices
> > > > > > for
> > > > > > > high
> > > > > > > performance serialization, and we would like to start the
> > > > conversation
> > > > > > > about
> > > > > > > what serialization technology is best for Beam.
> > > > > > >
> > > > > > > The goal of this discussion is to arrive at consensus on the
> > > > question:
> > > > > > > What
> > > > > > > serialization technology should we use for the data plane
> > envelope
> > > of
> > > > > the
> > > > > > > Fn
> > > > > > > API?
> > > > > > >
> > > > > > > To facilitate community discussion, we looked at the available
> > > > > > > technologies and
> > > > > > > tried to narrow the choices based on three criteria:
> > > > > > >
> > > > > > >  - Performance: What is the size of serialized data? How do we
> > > expect
> > > > > the
> > > > > > >    technology to affect pipeline speed and cost? etc
> > > > > > >
> > > > > > >  - Language support: Does the technology support the most
> > > widespread
> > > > > > > language
> > > > > > >    for data processing? Does it have a vibrant ecosystem of
> > > > contributed
> > > > > > >    language bindings? etc
> > > > > > >
> > > > > > >  - Community: What is the adoption of the technology? How
> mature
> > is
> > > > it?
> > > > > > > How
> > > > > > >    active is development? How is the documentation? etc
> > > > > > >
> > > > > > > Given these criteria, we came up with four technologies that
> are
> > > good
> > > > > > > contenders. All have similar & adequate schema capabilities.
> > > > > > >
> > > > > > >  - Apache Avro: Does not require code gen, but embedding the
> > schema
> > > > in
> > > > > > the
> > > > > > > data
> > > > > > >    could be an issue. Very popular.
> > > > > > >
> > > > > > >  - Apache Thrift: Probably a bit faster and compact than Avro.
> A
> > > huge
> > > > > > > number of
> > > > > > >    language supported.
> > > > > > >
> > > > > > >  - Protocol Buffers 3: Incorporates the lessons that Google has
> > > > learned
> > > > > > > through
> > > > > > >    long-term use of Protocol Buffers.
> > > > > > >
> > > > > > >  - FlatBuffers: Some benchmarks imply great performance from
> the
> > > > > > zero-copy
> > > > > > > mmap
> > > > > > >    idea. We would need to run representative experiments.
> > > > > > >
> > > > > > > I want to emphasize that this is a community decision, and this
> > > > thread
> > > > > is
> > > > > > > just
> > > > > > > the conversation starter for us all to weigh in. We just wanted
> > to
> > > do
> > > > > > some
> > > > > > > legwork to focus the discussion if we could.
> > > > > > >
> > > > > > > And there's a minor follow-up question: Once we settle here, is
> > > that
> > > > > > > technology
> > > > > > > also suitable for the low-bandwidth Runner API for defining
> > > > pipelines,
> > > > > or
> > > > > > > does
> > > > > > > anyone think we need to consider a second technology (like
> JSON)
> > > for
> > > > > > > usability
> > > > > > > reasons?
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> >
>

Re: [DISCUSS] Beam data plane serialization tech

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
I wanted to say a bit more to clarify and enliven this discussion. My use
of the term "data plane" may have been confusing. I didn't mean to focus it
quite so much on the encoded elements. What I meant to discuss was the
entirety of performance-sensitive interactions between the runner and
user-defined functions. So let's drop the implied control/data distinction
and just talk about the whole interface.

At the risk of writing at length about something everyone knows... the
motivation for the Fn API is this: we have a few types of user-definable
functions (UDFs) that occur in pipelines, and we need to invoke them in a
language-independent manner. These are DoFn, CombineFn, WindowFn,
BoundedSource, UnboundedSource, ViewFn/PCollectionView, and Coder.

I will show a bad idea: Take the interfaces of the above functions (minus
Coder, which is special) and just turn them into RPC interfaces, and the
SDK's job is just to be a trivial or near-trivial bridge from RPC to
language-specific method calls. This is a bad proposal, but hopefully helps
to show issues such as:

 - How and when do we deserialize user code / launch a container? (my bad
idea above doesn't answer; probably too often!)
 - How and when do we encode/decode elements? (my bad idea above would
require it between every UDF)
 - How do we manage calls that are more than simply a stream of elements in
a bundle? (example: side inputs)

Any Fn API is required to have the same semantics as this simple proposal,
but should achieve it with superior performance. I'll leave off the details
since I am not authoring them personally. But let's assume as a baseline
the approach of executing a fused stage of same-language UDFs in a row
without any encoding/decoding or RPC, and making a single RPC call per
bundle (ignoring amortized round trips for streaming bytes).

I gather from this thread these questions (which I may be interpreting
wrong; apologies if so) and I would like to answer them relative to this
design sketch:

Q: Since we have one RPC per bundle and it goes through the whole fused
stage, and we have a whole stream of elements per call, doesn't the data
dominate the envelope?
A: In streaming executions, bundles can be very small, so the data will not
necessarily dominate.

Q: Do we really need structured messages? Perhaps byte streams with fairly
trivial metadata suffice and we can just hand roll it?
A: I think that schematized tech is well-proven for adaptability and it is
also handy for code gen, regardless of performance. So to me the question
is whether or not we need structured messages at all, or if we can model
every high throughput communication as coder-encoded streams. I think that
things like commits to state, acknowledgements of timer firings, pull-based
requests like side inputs are probably best expressed via a schema. But
maybe I am overlooking some design ideas.

Q: How will side inputs arrive?
A: This API is really designed to be pull-based, so it sort of implies a
great many small RPCs (and caching).

I'm sure I've left off some discussion points, and maybe oversimplified
some things, but does this answer the questions somewhat? Does this clarify
the suggested choices of tech? Do you still think we don't need them?

Kenn

On Mon, Jun 20, 2016 at 7:48 AM, Bobby Evans <ev...@yahoo-inc.com.invalid>
wrote:

> In storm we use JSON as the default communication between shell bolts and
> shell spouts, which allows for APIs in non JVM languages. It works rather
> well.  That being said it is also slow, and we made it a plugin so others
> could make their own, faster, implementations.  For storm both the data and
> the control are serialized to JSON, so I am not sure how much of that is
> control and how much of it is the data that makes it slow.  I personally
> would like to see a simple benchmark that implements the basic protocol
> between the two so we can actually have a more numeric comparison.  As well
> as any pain that someone experienced trying to implement even a proof of
> concept.
>
> I agree with Amit too that long term we may want to think about supporting
> structured data, and rely less on POJOs.  It allows for a lot of
> optimizations in addition to having out of the box support for
> serializing/de-serializing them in another language. But perhaps that is
> more of a layer that sits on top of beam instead, because a lot of the
> optimizations really make the most since in a declarative DSL like context.
>
>  - Bobby
>
>     On Saturday, June 18, 2016 6:56 AM, Amit Sela <am...@gmail.com>
> wrote:
>
>
>  My +1 for JSON was for the fact that it's common enough and simpler than
> Protbuff/Avro/Thrift, and I would guess that (almost) all languages
> acknowledge it, though I might be wrong here.
>
> As for KV & WindowedValue, I'm not sure what's the issue with Kryo, but the
> "hardest" thing I had to do to get it working with Spark was to register
> 3rd party implementations for Guava Immutable collections. And I honestly
> don't know if there is one framework that covers everything in all (common)
> languages.
>
> Finally, if I understand correctly, the suggestion is to transmit the data
> as bytes with the appropriate coders, correct ? For the new Spark for
> example, they use Encoders
> <
> https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
> >
> that have an internal schema and allows the engine to avoid
> deserializations (and other optimizations) using this schema. So while the
> current version of the Spark runner actually transforms objects into bytes
> prior to shuffle, that might not be the best implementation for the next
> generation of the runner...
>
> This is how I see things from my pretty modest experience with
> serialization frameworks. Please correct me if/where I might be wrong.
>
> Thanks,
> Amit
>
> On Fri, Jun 17, 2016 at 8:48 PM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> > In the Runner API proposal doc, there are 10+ different types with
> several
> > fields each.
> > Is it important to have a code generator for the schema?
> > * simplify the SDK development process
> > * reduce errors due to differences in custom implementation
> >
> > I'm not familiar with tool(s) which can take a JSON schema (e.g.
> > http://json-schema.org/) and generate code in multiple languages.
> Anyone?
> >
> >
> > For the Data Plane API, a Runner and SDK must be able to encode elements
> > such as WindowedValue and KVs in such a way that both sides can interpret
> > them. For example, a Runner will be required to implement GBK so it must
> be
> > able to read the windowing information from the "bytes" transmitted,
> > additionally it will need to be able to split KV<K, V> records apart and
> > recreate KV<K, Iterable<V>> for the SDK. Since Coders are the dominant
> way
> > of encoding things, the Data Plane API will transmit "bytes" with the
> > element boundaries encoded in some way. Aljoscha, I agree with you that a
> > good choice for transmitting bytes between VMs/languages is very
> important.
> > Even though we are still transmitting mostly "bytes", error handling &
> > connection handling are still important.
> > For example, if we were to use gRPC and proto3 with a bidirectional
> stream
> > based API, we would get:
> > the Runner and SDK can both push data both ways (stream from/to GBK,
> stream
> > from/to state)
> > error handling
> > code generation of client libraries
> > HTTP/2
> >
> > As for the encoding, any SDK can choose any serialization it wants such
> as
> > Kryo but to get interoperability with other languages that would require
> > others to implement parts of the Kryo serialization spec to be able to
> > interpret the "bytes". Thus certain types like KV & WindowedValue should
> be
> > encoded in a way which allows for this interoperability.
> >
> >
> >
> >
> >
> >
> > On Fri, Jun 17, 2016 at 3:20 AM, Amit Sela <am...@gmail.com> wrote:
> >
> > > +1 on Aljoscha comment, not sure where's the benefit in having a
> > > "schematic" serialization.
> > >
> > > I know that Spark and I think Flink as well, use Kryo
> > > <https://github.com/EsotericSoftware/kryo> for serialization (to be
> > > accurate it's Chill <https://github.com/twitter/chill> for Spark) and
> I
> > > found it very impressive even comparing to "manual" serializations,
> > >  i.e., it seems to outperform Spark's "native" Encoders (1.6+) for
> > > primitives..
> > > In addition it clearly supports Java and Scala, and there are 3rd party
> > > libraries for Clojure and Objective-C.
> > >
> > > I guess my bottom-line here agrees with Kenneth - performance and
> > > interoperability - but I'm just not sure if schema based serializers
> are
> > > *always* the fastest.
> > >
> > > As for pipeline serialization, since performance is not the main issue,
> > and
> > > I think usability would be very important, I say +1 for JSON.
> > >
> > > For anyone who spent sometime on benchmarking serialization libraries,
> > know
> > > is the time to speak up ;)
> > >
> > > Thanks,
> > > Amit
> > >
> > > On Fri, Jun 17, 2016 at 12:47 PM Aljoscha Krettek <aljoscha@apache.org
> >
> > > wrote:
> > >
> > > > Hi,
> > > > am I correct in assuming that the transmitted envelopes would mostly
> > > > contain coder-serialized values? If so, wouldn't the header of an
> > > envelope
> > > > just be the number of contained bytes and number of values? I'm
> > probably
> > > > missing something but with these assumptions I don't see the benefit
> of
> > > > using something like Avro/Thrift/Protobuf for serializing the
> > main-input
> > > > value envelopes. We would just need a system that can send byte data
> > > really
> > > > fast between languages/VMs.
> > > >
> > > > By the way, another interesting question (at least for me) is how
> other
> > > > data, such as side-inputs, is going to arrive at the DoFn if we want
> to
> > > > support a general interface for different languages.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Thu, 16 Jun 2016 at 22:33 Kenneth Knowles <klk@google.com.invalid
> >
> > > > wrote:
> > > >
> > > > > (Apologies for the formatting)
> > > > >
> > > > > On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles <kl...@google.com>
> > > > wrote:
> > > > >
> > > > > > Hello everyone!
> > > > > >
> > > > > > We are busily working on a Runner API (for building and
> > transmitting
> > > > > > pipelines)
> > > > > > and a Fn API (for invoking user-defined functions found within
> > > > pipelines)
> > > > > > as
> > > > > > outlined in the Beam technical vision [1]. Both of these require
> a
> > > > > > language-independent serialization technology for
> interoperability
> > > > > between
> > > > > > SDKs
> > > > > > and runners.
> > > > > >
> > > > > > The Fn API includes a high-bandwidth data plane where bundles are
> > > > > > transmitted
> > > > > > via some serialization/RPC envelope (inside the envelope, the
> > stream
> > > of
> > > > > > elements is encoded with a coder) to transfer bundles between the
> > > > runner
> > > > > > and
> > > > > > the SDK, so performance is extremely important. There are many
> > > choices
> > > > > for
> > > > > > high
> > > > > > performance serialization, and we would like to start the
> > > conversation
> > > > > > about
> > > > > > what serialization technology is best for Beam.
> > > > > >
> > > > > > The goal of this discussion is to arrive at consensus on the
> > > question:
> > > > > > What
> > > > > > serialization technology should we use for the data plane
> envelope
> > of
> > > > the
> > > > > > Fn
> > > > > > API?
> > > > > >
> > > > > > To facilitate community discussion, we looked at the available
> > > > > > technologies and
> > > > > > tried to narrow the choices based on three criteria:
> > > > > >
> > > > > >  - Performance: What is the size of serialized data? How do we
> > expect
> > > > the
> > > > > >    technology to affect pipeline speed and cost? etc
> > > > > >
> > > > > >  - Language support: Does the technology support the most
> > widespread
> > > > > > language
> > > > > >    for data processing? Does it have a vibrant ecosystem of
> > > contributed
> > > > > >    language bindings? etc
> > > > > >
> > > > > >  - Community: What is the adoption of the technology? How mature
> is
> > > it?
> > > > > > How
> > > > > >    active is development? How is the documentation? etc
> > > > > >
> > > > > > Given these criteria, we came up with four technologies that are
> > good
> > > > > > contenders. All have similar & adequate schema capabilities.
> > > > > >
> > > > > >  - Apache Avro: Does not require code gen, but embedding the
> schema
> > > in
> > > > > the
> > > > > > data
> > > > > >    could be an issue. Very popular.
> > > > > >
> > > > > >  - Apache Thrift: Probably a bit faster and compact than Avro. A
> > huge
> > > > > > number of
> > > > > >    language supported.
> > > > > >
> > > > > >  - Protocol Buffers 3: Incorporates the lessons that Google has
> > > learned
> > > > > > through
> > > > > >    long-term use of Protocol Buffers.
> > > > > >
> > > > > >  - FlatBuffers: Some benchmarks imply great performance from the
> > > > > zero-copy
> > > > > > mmap
> > > > > >    idea. We would need to run representative experiments.
> > > > > >
> > > > > > I want to emphasize that this is a community decision, and this
> > > thread
> > > > is
> > > > > > just
> > > > > > the conversation starter for us all to weigh in. We just wanted
> to
> > do
> > > > > some
> > > > > > legwork to focus the discussion if we could.
> > > > > >
> > > > > > And there's a minor follow-up question: Once we settle here, is
> > that
> > > > > > technology
> > > > > > also suitable for the low-bandwidth Runner API for defining
> > > pipelines,
> > > > or
> > > > > > does
> > > > > > anyone think we need to consider a second technology (like JSON)
> > for
> > > > > > usability
> > > > > > reasons?
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
>
>

Re: [DISCUSS] Beam data plane serialization tech

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
In storm we use JSON as the default communication between shell bolts and shell spouts, which allows for APIs in non JVM languages. It works rather well.  That being said it is also slow, and we made it a plugin so others could make their own, faster, implementations.  For storm both the data and the control are serialized to JSON, so I am not sure how much of that is control and how much of it is the data that makes it slow.  I personally would like to see a simple benchmark that implements the basic protocol between the two so we can actually have a more numeric comparison.  As well as any pain that someone experienced trying to implement even a proof of concept.

I agree with Amit too that long term we may want to think about supporting structured data, and rely less on POJOs.  It allows for a lot of optimizations in addition to having out of the box support for serializing/de-serializing them in another language. But perhaps that is more of a layer that sits on top of beam instead, because a lot of the optimizations really make the most since in a declarative DSL like context.

 - Bobby 

    On Saturday, June 18, 2016 6:56 AM, Amit Sela <am...@gmail.com> wrote:
 

 My +1 for JSON was for the fact that it's common enough and simpler than
Protbuff/Avro/Thrift, and I would guess that (almost) all languages
acknowledge it, though I might be wrong here.

As for KV & WindowedValue, I'm not sure what's the issue with Kryo, but the
"hardest" thing I had to do to get it working with Spark was to register
3rd party implementations for Guava Immutable collections. And I honestly
don't know if there is one framework that covers everything in all (common)
languages.

Finally, if I understand correctly, the suggestion is to transmit the data
as bytes with the appropriate coders, correct ? For the new Spark for
example, they use Encoders
<https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html>
that have an internal schema and allows the engine to avoid
deserializations (and other optimizations) using this schema. So while the
current version of the Spark runner actually transforms objects into bytes
prior to shuffle, that might not be the best implementation for the next
generation of the runner...

This is how I see things from my pretty modest experience with
serialization frameworks. Please correct me if/where I might be wrong.

Thanks,
Amit

On Fri, Jun 17, 2016 at 8:48 PM Lukasz Cwik <lc...@google.com.invalid>
wrote:

> In the Runner API proposal doc, there are 10+ different types with several
> fields each.
> Is it important to have a code generator for the schema?
> * simplify the SDK development process
> * reduce errors due to differences in custom implementation
>
> I'm not familiar with tool(s) which can take a JSON schema (e.g.
> http://json-schema.org/) and generate code in multiple languages. Anyone?
>
>
> For the Data Plane API, a Runner and SDK must be able to encode elements
> such as WindowedValue and KVs in such a way that both sides can interpret
> them. For example, a Runner will be required to implement GBK so it must be
> able to read the windowing information from the "bytes" transmitted,
> additionally it will need to be able to split KV<K, V> records apart and
> recreate KV<K, Iterable<V>> for the SDK. Since Coders are the dominant way
> of encoding things, the Data Plane API will transmit "bytes" with the
> element boundaries encoded in some way. Aljoscha, I agree with you that a
> good choice for transmitting bytes between VMs/languages is very important.
> Even though we are still transmitting mostly "bytes", error handling &
> connection handling are still important.
> For example, if we were to use gRPC and proto3 with a bidirectional stream
> based API, we would get:
> the Runner and SDK can both push data both ways (stream from/to GBK, stream
> from/to state)
> error handling
> code generation of client libraries
> HTTP/2
>
> As for the encoding, any SDK can choose any serialization it wants such as
> Kryo but to get interoperability with other languages that would require
> others to implement parts of the Kryo serialization spec to be able to
> interpret the "bytes". Thus certain types like KV & WindowedValue should be
> encoded in a way which allows for this interoperability.
>
>
>
>
>
>
> On Fri, Jun 17, 2016 at 3:20 AM, Amit Sela <am...@gmail.com> wrote:
>
> > +1 on Aljoscha comment, not sure where's the benefit in having a
> > "schematic" serialization.
> >
> > I know that Spark and I think Flink as well, use Kryo
> > <https://github.com/EsotericSoftware/kryo> for serialization (to be
> > accurate it's Chill <https://github.com/twitter/chill> for Spark) and I
> > found it very impressive even comparing to "manual" serializations,
> >  i.e., it seems to outperform Spark's "native" Encoders (1.6+) for
> > primitives..
> > In addition it clearly supports Java and Scala, and there are 3rd party
> > libraries for Clojure and Objective-C.
> >
> > I guess my bottom-line here agrees with Kenneth - performance and
> > interoperability - but I'm just not sure if schema based serializers are
> > *always* the fastest.
> >
> > As for pipeline serialization, since performance is not the main issue,
> and
> > I think usability would be very important, I say +1 for JSON.
> >
> > For anyone who spent sometime on benchmarking serialization libraries,
> know
> > is the time to speak up ;)
> >
> > Thanks,
> > Amit
> >
> > On Fri, Jun 17, 2016 at 12:47 PM Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Hi,
> > > am I correct in assuming that the transmitted envelopes would mostly
> > > contain coder-serialized values? If so, wouldn't the header of an
> > envelope
> > > just be the number of contained bytes and number of values? I'm
> probably
> > > missing something but with these assumptions I don't see the benefit of
> > > using something like Avro/Thrift/Protobuf for serializing the
> main-input
> > > value envelopes. We would just need a system that can send byte data
> > really
> > > fast between languages/VMs.
> > >
> > > By the way, another interesting question (at least for me) is how other
> > > data, such as side-inputs, is going to arrive at the DoFn if we want to
> > > support a general interface for different languages.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Thu, 16 Jun 2016 at 22:33 Kenneth Knowles <kl...@google.com.invalid>
> > > wrote:
> > >
> > > > (Apologies for the formatting)
> > > >
> > > > On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles <kl...@google.com>
> > > wrote:
> > > >
> > > > > Hello everyone!
> > > > >
> > > > > We are busily working on a Runner API (for building and
> transmitting
> > > > > pipelines)
> > > > > and a Fn API (for invoking user-defined functions found within
> > > pipelines)
> > > > > as
> > > > > outlined in the Beam technical vision [1]. Both of these require a
> > > > > language-independent serialization technology for interoperability
> > > > between
> > > > > SDKs
> > > > > and runners.
> > > > >
> > > > > The Fn API includes a high-bandwidth data plane where bundles are
> > > > > transmitted
> > > > > via some serialization/RPC envelope (inside the envelope, the
> stream
> > of
> > > > > elements is encoded with a coder) to transfer bundles between the
> > > runner
> > > > > and
> > > > > the SDK, so performance is extremely important. There are many
> > choices
> > > > for
> > > > > high
> > > > > performance serialization, and we would like to start the
> > conversation
> > > > > about
> > > > > what serialization technology is best for Beam.
> > > > >
> > > > > The goal of this discussion is to arrive at consensus on the
> > question:
> > > > > What
> > > > > serialization technology should we use for the data plane envelope
> of
> > > the
> > > > > Fn
> > > > > API?
> > > > >
> > > > > To facilitate community discussion, we looked at the available
> > > > > technologies and
> > > > > tried to narrow the choices based on three criteria:
> > > > >
> > > > >  - Performance: What is the size of serialized data? How do we
> expect
> > > the
> > > > >    technology to affect pipeline speed and cost? etc
> > > > >
> > > > >  - Language support: Does the technology support the most
> widespread
> > > > > language
> > > > >    for data processing? Does it have a vibrant ecosystem of
> > contributed
> > > > >    language bindings? etc
> > > > >
> > > > >  - Community: What is the adoption of the technology? How mature is
> > it?
> > > > > How
> > > > >    active is development? How is the documentation? etc
> > > > >
> > > > > Given these criteria, we came up with four technologies that are
> good
> > > > > contenders. All have similar & adequate schema capabilities.
> > > > >
> > > > >  - Apache Avro: Does not require code gen, but embedding the schema
> > in
> > > > the
> > > > > data
> > > > >    could be an issue. Very popular.
> > > > >
> > > > >  - Apache Thrift: Probably a bit faster and compact than Avro. A
> huge
> > > > > number of
> > > > >    language supported.
> > > > >
> > > > >  - Protocol Buffers 3: Incorporates the lessons that Google has
> > learned
> > > > > through
> > > > >    long-term use of Protocol Buffers.
> > > > >
> > > > >  - FlatBuffers: Some benchmarks imply great performance from the
> > > > zero-copy
> > > > > mmap
> > > > >    idea. We would need to run representative experiments.
> > > > >
> > > > > I want to emphasize that this is a community decision, and this
> > thread
> > > is
> > > > > just
> > > > > the conversation starter for us all to weigh in. We just wanted to
> do
> > > > some
> > > > > legwork to focus the discussion if we could.
> > > > >
> > > > > And there's a minor follow-up question: Once we settle here, is
> that
> > > > > technology
> > > > > also suitable for the low-bandwidth Runner API for defining
> > pipelines,
> > > or
> > > > > does
> > > > > anyone think we need to consider a second technology (like JSON)
> for
> > > > > usability
> > > > > reasons?
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38
> > > > >
> > > > >
> > > >
> > >
> >
>


  

Re: [DISCUSS] Beam data plane serialization tech

Posted by Amit Sela <am...@gmail.com>.
My +1 for JSON was for the fact that it's common enough and simpler than
Protbuff/Avro/Thrift, and I would guess that (almost) all languages
acknowledge it, though I might be wrong here.

As for KV & WindowedValue, I'm not sure what's the issue with Kryo, but the
"hardest" thing I had to do to get it working with Spark was to register
3rd party implementations for Guava Immutable collections. And I honestly
don't know if there is one framework that covers everything in all (common)
languages.

Finally, if I understand correctly, the suggestion is to transmit the data
as bytes with the appropriate coders, correct ? For the new Spark for
example, they use Encoders
<https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html>
that have an internal schema and allows the engine to avoid
deserializations (and other optimizations) using this schema. So while the
current version of the Spark runner actually transforms objects into bytes
prior to shuffle, that might not be the best implementation for the next
generation of the runner...

This is how I see things from my pretty modest experience with
serialization frameworks. Please correct me if/where I might be wrong.

Thanks,
Amit

On Fri, Jun 17, 2016 at 8:48 PM Lukasz Cwik <lc...@google.com.invalid>
wrote:

> In the Runner API proposal doc, there are 10+ different types with several
> fields each.
> Is it important to have a code generator for the schema?
> * simplify the SDK development process
> * reduce errors due to differences in custom implementation
>
> I'm not familiar with tool(s) which can take a JSON schema (e.g.
> http://json-schema.org/) and generate code in multiple languages. Anyone?
>
>
> For the Data Plane API, a Runner and SDK must be able to encode elements
> such as WindowedValue and KVs in such a way that both sides can interpret
> them. For example, a Runner will be required to implement GBK so it must be
> able to read the windowing information from the "bytes" transmitted,
> additionally it will need to be able to split KV<K, V> records apart and
> recreate KV<K, Iterable<V>> for the SDK. Since Coders are the dominant way
> of encoding things, the Data Plane API will transmit "bytes" with the
> element boundaries encoded in some way. Aljoscha, I agree with you that a
> good choice for transmitting bytes between VMs/languages is very important.
> Even though we are still transmitting mostly "bytes", error handling &
> connection handling are still important.
> For example, if we were to use gRPC and proto3 with a bidirectional stream
> based API, we would get:
> the Runner and SDK can both push data both ways (stream from/to GBK, stream
> from/to state)
> error handling
> code generation of client libraries
> HTTP/2
>
> As for the encoding, any SDK can choose any serialization it wants such as
> Kryo but to get interoperability with other languages that would require
> others to implement parts of the Kryo serialization spec to be able to
> interpret the "bytes". Thus certain types like KV & WindowedValue should be
> encoded in a way which allows for this interoperability.
>
>
>
>
>
>
> On Fri, Jun 17, 2016 at 3:20 AM, Amit Sela <am...@gmail.com> wrote:
>
> > +1 on Aljoscha comment, not sure where's the benefit in having a
> > "schematic" serialization.
> >
> > I know that Spark and I think Flink as well, use Kryo
> > <https://github.com/EsotericSoftware/kryo> for serialization (to be
> > accurate it's Chill <https://github.com/twitter/chill> for Spark) and I
> > found it very impressive even comparing to "manual" serializations,
> >  i.e., it seems to outperform Spark's "native" Encoders (1.6+) for
> > primitives..
> > In addition it clearly supports Java and Scala, and there are 3rd party
> > libraries for Clojure and Objective-C.
> >
> > I guess my bottom-line here agrees with Kenneth - performance and
> > interoperability - but I'm just not sure if schema based serializers are
> > *always* the fastest.
> >
> > As for pipeline serialization, since performance is not the main issue,
> and
> > I think usability would be very important, I say +1 for JSON.
> >
> > For anyone who spent sometime on benchmarking serialization libraries,
> know
> > is the time to speak up ;)
> >
> > Thanks,
> > Amit
> >
> > On Fri, Jun 17, 2016 at 12:47 PM Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Hi,
> > > am I correct in assuming that the transmitted envelopes would mostly
> > > contain coder-serialized values? If so, wouldn't the header of an
> > envelope
> > > just be the number of contained bytes and number of values? I'm
> probably
> > > missing something but with these assumptions I don't see the benefit of
> > > using something like Avro/Thrift/Protobuf for serializing the
> main-input
> > > value envelopes. We would just need a system that can send byte data
> > really
> > > fast between languages/VMs.
> > >
> > > By the way, another interesting question (at least for me) is how other
> > > data, such as side-inputs, is going to arrive at the DoFn if we want to
> > > support a general interface for different languages.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Thu, 16 Jun 2016 at 22:33 Kenneth Knowles <kl...@google.com.invalid>
> > > wrote:
> > >
> > > > (Apologies for the formatting)
> > > >
> > > > On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles <kl...@google.com>
> > > wrote:
> > > >
> > > > > Hello everyone!
> > > > >
> > > > > We are busily working on a Runner API (for building and
> transmitting
> > > > > pipelines)
> > > > > and a Fn API (for invoking user-defined functions found within
> > > pipelines)
> > > > > as
> > > > > outlined in the Beam technical vision [1]. Both of these require a
> > > > > language-independent serialization technology for interoperability
> > > > between
> > > > > SDKs
> > > > > and runners.
> > > > >
> > > > > The Fn API includes a high-bandwidth data plane where bundles are
> > > > > transmitted
> > > > > via some serialization/RPC envelope (inside the envelope, the
> stream
> > of
> > > > > elements is encoded with a coder) to transfer bundles between the
> > > runner
> > > > > and
> > > > > the SDK, so performance is extremely important. There are many
> > choices
> > > > for
> > > > > high
> > > > > performance serialization, and we would like to start the
> > conversation
> > > > > about
> > > > > what serialization technology is best for Beam.
> > > > >
> > > > > The goal of this discussion is to arrive at consensus on the
> > question:
> > > > > What
> > > > > serialization technology should we use for the data plane envelope
> of
> > > the
> > > > > Fn
> > > > > API?
> > > > >
> > > > > To facilitate community discussion, we looked at the available
> > > > > technologies and
> > > > > tried to narrow the choices based on three criteria:
> > > > >
> > > > >  - Performance: What is the size of serialized data? How do we
> expect
> > > the
> > > > >    technology to affect pipeline speed and cost? etc
> > > > >
> > > > >  - Language support: Does the technology support the most
> widespread
> > > > > language
> > > > >    for data processing? Does it have a vibrant ecosystem of
> > contributed
> > > > >    language bindings? etc
> > > > >
> > > > >  - Community: What is the adoption of the technology? How mature is
> > it?
> > > > > How
> > > > >    active is development? How is the documentation? etc
> > > > >
> > > > > Given these criteria, we came up with four technologies that are
> good
> > > > > contenders. All have similar & adequate schema capabilities.
> > > > >
> > > > >  - Apache Avro: Does not require code gen, but embedding the schema
> > in
> > > > the
> > > > > data
> > > > >    could be an issue. Very popular.
> > > > >
> > > > >  - Apache Thrift: Probably a bit faster and compact than Avro. A
> huge
> > > > > number of
> > > > >    language supported.
> > > > >
> > > > >  - Protocol Buffers 3: Incorporates the lessons that Google has
> > learned
> > > > > through
> > > > >    long-term use of Protocol Buffers.
> > > > >
> > > > >  - FlatBuffers: Some benchmarks imply great performance from the
> > > > zero-copy
> > > > > mmap
> > > > >    idea. We would need to run representative experiments.
> > > > >
> > > > > I want to emphasize that this is a community decision, and this
> > thread
> > > is
> > > > > just
> > > > > the conversation starter for us all to weigh in. We just wanted to
> do
> > > > some
> > > > > legwork to focus the discussion if we could.
> > > > >
> > > > > And there's a minor follow-up question: Once we settle here, is
> that
> > > > > technology
> > > > > also suitable for the low-bandwidth Runner API for defining
> > pipelines,
> > > or
> > > > > does
> > > > > anyone think we need to consider a second technology (like JSON)
> for
> > > > > usability
> > > > > reasons?
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Beam data plane serialization tech

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
In the Runner API proposal doc, there are 10+ different types with several
fields each.
Is it important to have a code generator for the schema?
* simplify the SDK development process
* reduce errors due to differences in custom implementation

I'm not familiar with tool(s) which can take a JSON schema (e.g.
http://json-schema.org/) and generate code in multiple languages. Anyone?


For the Data Plane API, a Runner and SDK must be able to encode elements
such as WindowedValue and KVs in such a way that both sides can interpret
them. For example, a Runner will be required to implement GBK so it must be
able to read the windowing information from the "bytes" transmitted,
additionally it will need to be able to split KV<K, V> records apart and
recreate KV<K, Iterable<V>> for the SDK. Since Coders are the dominant way
of encoding things, the Data Plane API will transmit "bytes" with the
element boundaries encoded in some way. Aljoscha, I agree with you that a
good choice for transmitting bytes between VMs/languages is very important.
Even though we are still transmitting mostly "bytes", error handling &
connection handling are still important.
For example, if we were to use gRPC and proto3 with a bidirectional stream
based API, we would get:
the Runner and SDK can both push data both ways (stream from/to GBK, stream
from/to state)
error handling
code generation of client libraries
HTTP/2

As for the encoding, any SDK can choose any serialization it wants such as
Kryo but to get interoperability with other languages that would require
others to implement parts of the Kryo serialization spec to be able to
interpret the "bytes". Thus certain types like KV & WindowedValue should be
encoded in a way which allows for this interoperability.






On Fri, Jun 17, 2016 at 3:20 AM, Amit Sela <am...@gmail.com> wrote:

> +1 on Aljoscha comment, not sure where's the benefit in having a
> "schematic" serialization.
>
> I know that Spark and I think Flink as well, use Kryo
> <https://github.com/EsotericSoftware/kryo> for serialization (to be
> accurate it's Chill <https://github.com/twitter/chill> for Spark) and I
> found it very impressive even comparing to "manual" serializations,
>  i.e., it seems to outperform Spark's "native" Encoders (1.6+) for
> primitives..
> In addition it clearly supports Java and Scala, and there are 3rd party
> libraries for Clojure and Objective-C.
>
> I guess my bottom-line here agrees with Kenneth - performance and
> interoperability - but I'm just not sure if schema based serializers are
> *always* the fastest.
>
> As for pipeline serialization, since performance is not the main issue, and
> I think usability would be very important, I say +1 for JSON.
>
> For anyone who spent sometime on benchmarking serialization libraries, know
> is the time to speak up ;)
>
> Thanks,
> Amit
>
> On Fri, Jun 17, 2016 at 12:47 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Hi,
> > am I correct in assuming that the transmitted envelopes would mostly
> > contain coder-serialized values? If so, wouldn't the header of an
> envelope
> > just be the number of contained bytes and number of values? I'm probably
> > missing something but with these assumptions I don't see the benefit of
> > using something like Avro/Thrift/Protobuf for serializing the main-input
> > value envelopes. We would just need a system that can send byte data
> really
> > fast between languages/VMs.
> >
> > By the way, another interesting question (at least for me) is how other
> > data, such as side-inputs, is going to arrive at the DoFn if we want to
> > support a general interface for different languages.
> >
> > Cheers,
> > Aljoscha
> >
> > On Thu, 16 Jun 2016 at 22:33 Kenneth Knowles <kl...@google.com.invalid>
> > wrote:
> >
> > > (Apologies for the formatting)
> > >
> > > On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles <kl...@google.com>
> > wrote:
> > >
> > > > Hello everyone!
> > > >
> > > > We are busily working on a Runner API (for building and transmitting
> > > > pipelines)
> > > > and a Fn API (for invoking user-defined functions found within
> > pipelines)
> > > > as
> > > > outlined in the Beam technical vision [1]. Both of these require a
> > > > language-independent serialization technology for interoperability
> > > between
> > > > SDKs
> > > > and runners.
> > > >
> > > > The Fn API includes a high-bandwidth data plane where bundles are
> > > > transmitted
> > > > via some serialization/RPC envelope (inside the envelope, the stream
> of
> > > > elements is encoded with a coder) to transfer bundles between the
> > runner
> > > > and
> > > > the SDK, so performance is extremely important. There are many
> choices
> > > for
> > > > high
> > > > performance serialization, and we would like to start the
> conversation
> > > > about
> > > > what serialization technology is best for Beam.
> > > >
> > > > The goal of this discussion is to arrive at consensus on the
> question:
> > > > What
> > > > serialization technology should we use for the data plane envelope of
> > the
> > > > Fn
> > > > API?
> > > >
> > > > To facilitate community discussion, we looked at the available
> > > > technologies and
> > > > tried to narrow the choices based on three criteria:
> > > >
> > > >  - Performance: What is the size of serialized data? How do we expect
> > the
> > > >    technology to affect pipeline speed and cost? etc
> > > >
> > > >  - Language support: Does the technology support the most widespread
> > > > language
> > > >    for data processing? Does it have a vibrant ecosystem of
> contributed
> > > >    language bindings? etc
> > > >
> > > >  - Community: What is the adoption of the technology? How mature is
> it?
> > > > How
> > > >    active is development? How is the documentation? etc
> > > >
> > > > Given these criteria, we came up with four technologies that are good
> > > > contenders. All have similar & adequate schema capabilities.
> > > >
> > > >  - Apache Avro: Does not require code gen, but embedding the schema
> in
> > > the
> > > > data
> > > >    could be an issue. Very popular.
> > > >
> > > >  - Apache Thrift: Probably a bit faster and compact than Avro. A huge
> > > > number of
> > > >    language supported.
> > > >
> > > >  - Protocol Buffers 3: Incorporates the lessons that Google has
> learned
> > > > through
> > > >    long-term use of Protocol Buffers.
> > > >
> > > >  - FlatBuffers: Some benchmarks imply great performance from the
> > > zero-copy
> > > > mmap
> > > >    idea. We would need to run representative experiments.
> > > >
> > > > I want to emphasize that this is a community decision, and this
> thread
> > is
> > > > just
> > > > the conversation starter for us all to weigh in. We just wanted to do
> > > some
> > > > legwork to focus the discussion if we could.
> > > >
> > > > And there's a minor follow-up question: Once we settle here, is that
> > > > technology
> > > > also suitable for the low-bandwidth Runner API for defining
> pipelines,
> > or
> > > > does
> > > > anyone think we need to consider a second technology (like JSON) for
> > > > usability
> > > > reasons?
> > > >
> > > > [1]
> > > >
> > >
> >
> https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38
> > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Beam data plane serialization tech

Posted by Amit Sela <am...@gmail.com>.
+1 on Aljoscha comment, not sure where's the benefit in having a
"schematic" serialization.

I know that Spark and I think Flink as well, use Kryo
<https://github.com/EsotericSoftware/kryo> for serialization (to be
accurate it's Chill <https://github.com/twitter/chill> for Spark) and I
found it very impressive even comparing to "manual" serializations,
 i.e., it seems to outperform Spark's "native" Encoders (1.6+) for
primitives..
In addition it clearly supports Java and Scala, and there are 3rd party
libraries for Clojure and Objective-C.

I guess my bottom-line here agrees with Kenneth - performance and
interoperability - but I'm just not sure if schema based serializers are
*always* the fastest.

As for pipeline serialization, since performance is not the main issue, and
I think usability would be very important, I say +1 for JSON.

For anyone who spent sometime on benchmarking serialization libraries, know
is the time to speak up ;)

Thanks,
Amit

On Fri, Jun 17, 2016 at 12:47 PM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> am I correct in assuming that the transmitted envelopes would mostly
> contain coder-serialized values? If so, wouldn't the header of an envelope
> just be the number of contained bytes and number of values? I'm probably
> missing something but with these assumptions I don't see the benefit of
> using something like Avro/Thrift/Protobuf for serializing the main-input
> value envelopes. We would just need a system that can send byte data really
> fast between languages/VMs.
>
> By the way, another interesting question (at least for me) is how other
> data, such as side-inputs, is going to arrive at the DoFn if we want to
> support a general interface for different languages.
>
> Cheers,
> Aljoscha
>
> On Thu, 16 Jun 2016 at 22:33 Kenneth Knowles <kl...@google.com.invalid>
> wrote:
>
> > (Apologies for the formatting)
> >
> > On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles <kl...@google.com>
> wrote:
> >
> > > Hello everyone!
> > >
> > > We are busily working on a Runner API (for building and transmitting
> > > pipelines)
> > > and a Fn API (for invoking user-defined functions found within
> pipelines)
> > > as
> > > outlined in the Beam technical vision [1]. Both of these require a
> > > language-independent serialization technology for interoperability
> > between
> > > SDKs
> > > and runners.
> > >
> > > The Fn API includes a high-bandwidth data plane where bundles are
> > > transmitted
> > > via some serialization/RPC envelope (inside the envelope, the stream of
> > > elements is encoded with a coder) to transfer bundles between the
> runner
> > > and
> > > the SDK, so performance is extremely important. There are many choices
> > for
> > > high
> > > performance serialization, and we would like to start the conversation
> > > about
> > > what serialization technology is best for Beam.
> > >
> > > The goal of this discussion is to arrive at consensus on the question:
> > > What
> > > serialization technology should we use for the data plane envelope of
> the
> > > Fn
> > > API?
> > >
> > > To facilitate community discussion, we looked at the available
> > > technologies and
> > > tried to narrow the choices based on three criteria:
> > >
> > >  - Performance: What is the size of serialized data? How do we expect
> the
> > >    technology to affect pipeline speed and cost? etc
> > >
> > >  - Language support: Does the technology support the most widespread
> > > language
> > >    for data processing? Does it have a vibrant ecosystem of contributed
> > >    language bindings? etc
> > >
> > >  - Community: What is the adoption of the technology? How mature is it?
> > > How
> > >    active is development? How is the documentation? etc
> > >
> > > Given these criteria, we came up with four technologies that are good
> > > contenders. All have similar & adequate schema capabilities.
> > >
> > >  - Apache Avro: Does not require code gen, but embedding the schema in
> > the
> > > data
> > >    could be an issue. Very popular.
> > >
> > >  - Apache Thrift: Probably a bit faster and compact than Avro. A huge
> > > number of
> > >    language supported.
> > >
> > >  - Protocol Buffers 3: Incorporates the lessons that Google has learned
> > > through
> > >    long-term use of Protocol Buffers.
> > >
> > >  - FlatBuffers: Some benchmarks imply great performance from the
> > zero-copy
> > > mmap
> > >    idea. We would need to run representative experiments.
> > >
> > > I want to emphasize that this is a community decision, and this thread
> is
> > > just
> > > the conversation starter for us all to weigh in. We just wanted to do
> > some
> > > legwork to focus the discussion if we could.
> > >
> > > And there's a minor follow-up question: Once we settle here, is that
> > > technology
> > > also suitable for the low-bandwidth Runner API for defining pipelines,
> or
> > > does
> > > anyone think we need to consider a second technology (like JSON) for
> > > usability
> > > reasons?
> > >
> > > [1]
> > >
> >
> https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38
> > >
> > >
> >
>

Re: [DISCUSS] Beam data plane serialization tech

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
am I correct in assuming that the transmitted envelopes would mostly
contain coder-serialized values? If so, wouldn't the header of an envelope
just be the number of contained bytes and number of values? I'm probably
missing something but with these assumptions I don't see the benefit of
using something like Avro/Thrift/Protobuf for serializing the main-input
value envelopes. We would just need a system that can send byte data really
fast between languages/VMs.

By the way, another interesting question (at least for me) is how other
data, such as side-inputs, is going to arrive at the DoFn if we want to
support a general interface for different languages.

Cheers,
Aljoscha

On Thu, 16 Jun 2016 at 22:33 Kenneth Knowles <kl...@google.com.invalid> wrote:

> (Apologies for the formatting)
>
> On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles <kl...@google.com> wrote:
>
> > Hello everyone!
> >
> > We are busily working on a Runner API (for building and transmitting
> > pipelines)
> > and a Fn API (for invoking user-defined functions found within pipelines)
> > as
> > outlined in the Beam technical vision [1]. Both of these require a
> > language-independent serialization technology for interoperability
> between
> > SDKs
> > and runners.
> >
> > The Fn API includes a high-bandwidth data plane where bundles are
> > transmitted
> > via some serialization/RPC envelope (inside the envelope, the stream of
> > elements is encoded with a coder) to transfer bundles between the runner
> > and
> > the SDK, so performance is extremely important. There are many choices
> for
> > high
> > performance serialization, and we would like to start the conversation
> > about
> > what serialization technology is best for Beam.
> >
> > The goal of this discussion is to arrive at consensus on the question:
> > What
> > serialization technology should we use for the data plane envelope of the
> > Fn
> > API?
> >
> > To facilitate community discussion, we looked at the available
> > technologies and
> > tried to narrow the choices based on three criteria:
> >
> >  - Performance: What is the size of serialized data? How do we expect the
> >    technology to affect pipeline speed and cost? etc
> >
> >  - Language support: Does the technology support the most widespread
> > language
> >    for data processing? Does it have a vibrant ecosystem of contributed
> >    language bindings? etc
> >
> >  - Community: What is the adoption of the technology? How mature is it?
> > How
> >    active is development? How is the documentation? etc
> >
> > Given these criteria, we came up with four technologies that are good
> > contenders. All have similar & adequate schema capabilities.
> >
> >  - Apache Avro: Does not require code gen, but embedding the schema in
> the
> > data
> >    could be an issue. Very popular.
> >
> >  - Apache Thrift: Probably a bit faster and compact than Avro. A huge
> > number of
> >    language supported.
> >
> >  - Protocol Buffers 3: Incorporates the lessons that Google has learned
> > through
> >    long-term use of Protocol Buffers.
> >
> >  - FlatBuffers: Some benchmarks imply great performance from the
> zero-copy
> > mmap
> >    idea. We would need to run representative experiments.
> >
> > I want to emphasize that this is a community decision, and this thread is
> > just
> > the conversation starter for us all to weigh in. We just wanted to do
> some
> > legwork to focus the discussion if we could.
> >
> > And there's a minor follow-up question: Once we settle here, is that
> > technology
> > also suitable for the low-bandwidth Runner API for defining pipelines, or
> > does
> > anyone think we need to consider a second technology (like JSON) for
> > usability
> > reasons?
> >
> > [1]
> >
> https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38
> >
> >
>

Re: [DISCUSS] Beam data plane serialization tech

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
(Apologies for the formatting)

On Thu, Jun 16, 2016 at 12:12 PM, Kenneth Knowles <kl...@google.com> wrote:

> Hello everyone!
>
> We are busily working on a Runner API (for building and transmitting
> pipelines)
> and a Fn API (for invoking user-defined functions found within pipelines)
> as
> outlined in the Beam technical vision [1]. Both of these require a
> language-independent serialization technology for interoperability between
> SDKs
> and runners.
>
> The Fn API includes a high-bandwidth data plane where bundles are
> transmitted
> via some serialization/RPC envelope (inside the envelope, the stream of
> elements is encoded with a coder) to transfer bundles between the runner
> and
> the SDK, so performance is extremely important. There are many choices for
> high
> performance serialization, and we would like to start the conversation
> about
> what serialization technology is best for Beam.
>
> The goal of this discussion is to arrive at consensus on the question:
> What
> serialization technology should we use for the data plane envelope of the
> Fn
> API?
>
> To facilitate community discussion, we looked at the available
> technologies and
> tried to narrow the choices based on three criteria:
>
>  - Performance: What is the size of serialized data? How do we expect the
>    technology to affect pipeline speed and cost? etc
>
>  - Language support: Does the technology support the most widespread
> language
>    for data processing? Does it have a vibrant ecosystem of contributed
>    language bindings? etc
>
>  - Community: What is the adoption of the technology? How mature is it?
> How
>    active is development? How is the documentation? etc
>
> Given these criteria, we came up with four technologies that are good
> contenders. All have similar & adequate schema capabilities.
>
>  - Apache Avro: Does not require code gen, but embedding the schema in the
> data
>    could be an issue. Very popular.
>
>  - Apache Thrift: Probably a bit faster and compact than Avro. A huge
> number of
>    language supported.
>
>  - Protocol Buffers 3: Incorporates the lessons that Google has learned
> through
>    long-term use of Protocol Buffers.
>
>  - FlatBuffers: Some benchmarks imply great performance from the zero-copy
> mmap
>    idea. We would need to run representative experiments.
>
> I want to emphasize that this is a community decision, and this thread is
> just
> the conversation starter for us all to weigh in. We just wanted to do some
> legwork to focus the discussion if we could.
>
> And there's a minor follow-up question: Once we settle here, is that
> technology
> also suitable for the low-bandwidth Runner API for defining pipelines, or
> does
> anyone think we need to consider a second technology (like JSON) for
> usability
> reasons?
>
> [1]
> https://docs.google.com/presentation/d/1E9seGPB_VXtY_KZP4HngDPTbsu5RVZFFaTlwEYa88Zw/present?slide=id.g108d3a202f_0_38
>
>