You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Robert Burke <ro...@frantil.com> on 2019/01/03 19:45:11 UTC

[Go SDK] User Defined Coders

One area that the Go SDK currently lacks: is the ability for users to
specify their own coders for types.

I've written a proposal document,
<https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#>
and
while I'm confident about the core, there are certainly some edge cases
that require discussion before getting on with the implementation.

At presently, the SDK only permits primitive value types (all numeric types
but complex, strings, and []bytes) which are coded with beam coders, and
structs whose exported fields are of those type, which is then encoded as
JSON. Protocol buffer support is hacked in to avoid the type anaiyzer, and
presents the current work around this issue.

The high level proposal is to catch up with Python and Java, and have a
coder registry. In addition, arrays, and maps should be permitted as well.

If you have alternatives, or other suggestions and opinions, I'd love to
hear them! Otherwise my intent is to get a PR ready by the end of January.

Thanks!
Robert Burke

Re: [Go SDK] User Defined Coders

Posted by Robert Burke <ro...@frantil.com>.
Schemas allow the runner to know the structure of the data they're
manipulating, so if a value is schema encoded, then the runner can
manipulate it, including selection and aggregation. In essense, it allows a
beam to handle the "common and boring but useful" parts of pipelines
agnostic of an SDK language, but allows the option of the SDK language to
handle more interesting processing.

I suggested as much in the other thread "Schemas in the Go SDK" I
(fruitlessly) tried to fork off for the schema discussion a few days ago.
I'll just copy that back over here. See point *5* below.

Schema's and Beam's codification end up as a short entrypoint DSL for
additional languages to support beam, gaining much power to garner usage
and experience, before supporting more sophisticated Beam features such as
Windowing and State and Time in the language natively.

------------------

*1.* Default behavior to support Schema's in some way doesn't remove the
need for certain specific uses of an atomic coder for a type. eg.
Specifying that Beam shouldn't look further into this type.

TBH the interaction between schema's and coders is the least interesting
part about schemas and matters in precious few circumstances. In
particular, when Grouping By Key, it seems like the schema coder should be
used by default but otherwise, not. Further, there's always the option to
"try the schema" encoding and should that fail, try any existing atomic
coder by default, though this risks data corruption in some situations.

*1.a *In a later beam version, it could be true that there's no need for
such uses. There's always the option to work around anything by writing at
DoFn that accepts a []byte, and then produces a given type. However
decoding []byte and encoding back again seems like a common enough
operation for some domains that having direct beam support in some capacity
is desirable for performance reasons.

*2.* It would be easy enough to have a pipeline fail at construction time
should a type not be able to derive a schema for itself, and it's put into
a schema required scenario.

*3.* The Go SDK does recursive type analysis to be able encode types
<https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/serialize.go#L346>
for
coders anyway, as Go has no native concept of "serializable types" or
"serializable functions" It wouldn't be too much of a stretch to convert
this representation to a Portable Schema representation.

When materializing types, Go has extensively defined Type Conversion rules
<https://golang.org/ref/spec#Conversions> which are accessible via the
reflect package. This means that we can always synthetically create an
instance of a real type from something like a schema, assuming they match
field for field. Eg. If a user declares a PCollection with a given Schema,
then in principle it would be possible to use that PCollection as an input
with a field for field compatible real struct type, and have this verified
at construction time. The "extra sauce" would be to have this happen for a
subset of fields for convenient extraction, ala the annotation use in java.

In particular, this means that whenever the Go SDK is in a scenario that it
doesn't have a schema*, it could probably create one ad-hoc *for that
context, and use the atomic coder the rest of the time if available.
Whether we want it do so is another matter, and probably situation specific.

*4. *It seems Long Term (in that it will be eventually be done, not that it
will necessarily take a long time to get there), that Schemas are likely
the interchange format for Cross Language pipeline support. That is, when
an SDK is invoking a transform in a different language (say, Beam Go
calling on Beam SQL), the values could be specified, and returned in the
schema format, to ensure compatibility. The trick here is that the expected
return schema still needs to be explicitly specified from the user in some
circumstances. (eg. Going from a SQL statement -> Schema doesn't seem like
a natural fit, and won't necessarily be available at pipeline construction
time in the remote language.)

*5.* An interesting aspect of schemas is that they fundamentally enable
SDKs to start with a light DSL layer with "known" types and
transforms/combines/joins, which then never need to be invoked on the SDK
layer. Runners could each implement schemas directly and avoid unnecessary
FnAPI hops for improved performance, largely because they know the type's
structure. No need for any of it to be implemented SDK side to start.

 Overall this is a noble goal in that it enables more languages more
easily, but it's concerning from my view, in that the other goal is to
enable data processing in the SDK language, and this moves it farther away
from the more general, if verbose approaches to do the same thing.

I'm on the side of Scalable Data Processing in Go, which ideally entails
writing Go, rather than an abstract DSL.


I don't speak for all Go users, and welcome hearing from others.


On Tue, 8 Jan 2019 at 12:15 Reuven Lax <re...@google.com> wrote:

> I wonder if we could do this _only_ over the FnApi. The FnApi already does
> batching I believe. What if we made schemas a fundamental part of our
> protos, and had no SchemaCoder. The FnApi could then batch up a bunch of
> rows an encode using Arrow before sending over the wire to the harness.
>
> Of course this still turns back into individual records before it goes
> back to user code. However well-known combiners can be executed directly in
> the harness, which means aggregations like "sum a field" can be run inside
> the harness over the columnar data. Moving these combiners into the harness
> might itself be a huge perf benefit for Python, as we could then run them
> in a more-performant language.
>
> Reuven
>
> On Tue, Jan 8, 2019 at 7:44 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Tue, Jan 8, 2019 at 4:32 PM Reuven Lax <re...@google.com> wrote:
>> >
>> > I agree with this, but I think it's a significant rethinking of Beam
>> that I didn't want to couple to schemas. In addition to rethinking the API,
>> it might also require rethinking all of our runners.
>>
>> We're already marshaling (including batching) data over the FnApi, so
>> it might not be that big of a change. Also, the choice of encoding
>> over the data channel is already parametrizable via a coder, so it's
>> easy to make this an optional feature that runners and SDKs can opt
>> into. I agree that we don't want to couple it to schemas (though
>> that's where it becomes even more useful).
>>
>> > Also while columnar can be a large perf win, I suspect that we
>> currently have lower-hanging fruit to optimize when it comes to performance.
>>
>> It's probably a bigger win for Python than for Java.
>>
>> >
>> > Reuven
>> >
>> > On Tue, Jan 8, 2019 at 5:25 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>
>> >> On Fri, Jan 4, 2019 at 12:54 AM Reuven Lax <re...@google.com> wrote:
>> >> >
>> >> > I looked at Apache Arrow as a potential serialization format for Row
>> coders. At the time it didn't seem a perfect fit - Beam's programming model
>> is record-at-a-time, and Arrow is optimized for large batches of records
>> (while Beam has a concept of "bundles" they are completely non
>> deterministic, and records might bundle different on retry). You could use
>> Arrow with single-record batches, but I suspect that would end up adding a
>> lot of extra overhead. That being said, I think it's still something worth
>> investigating further.
>> >>
>> >> Though Beam's model is row-oriented, I think it'd make a lot of sense
>> >> to support column-oriented transfer of data across the data plane
>> >> (we're already concatenating serialized records lengthwise), with
>> >> Arrow as a first candidate, and (either as part of the public API or
>> >> as an implementation detail) columnar processing as well (e.g.
>> >> projections, maps, filters, and aggregations can often be done more
>> >> efficiently in a columnar fashion). While this is often a significant
>> >> win in C++ (and presumably Java), it's essential for doing
>> >> high-performance computing in Python (e.g. Numpy, SciPy, Pandas,
>> >> Tensorflow, ... all have batch-oriented APIs and avoid representing
>> >> records as individual objects, something we'll need to tackle for
>> >> BeamPython at least).
>> >>
>> >> >
>> >> > Reuven
>> >> >
>> >> >
>> >> >
>> >> > On Fri, Jan 4, 2019 at 12:34 AM Gleb Kanterov <gl...@spotify.com>
>> wrote:
>> >> >>
>> >> >> Reuven, it sounds great. I see there is a similar thing to Row
>> coders happening in Apache Arrow, and there is a similarity between Apache
>> Arrow Flight and data exchange service in portability. How do you see these
>> two things relate to each other in the long term?
>> >> >>
>> >> >> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com>
>> wrote:
>> >> >>>
>> >> >>> The biggest advantage is actually readability and usability. A
>> secondary advantage is that it means that Go will be able to interact
>> seamlessly with BeamSQL, which would be a big win for Go.
>> >> >>>
>> >> >>> A schema is basically a way of saying that a record has a specific
>> set of (possibly nested, possibly repeated) fields. So for instance let's
>> say that the user's type is a struct with fields named user, country,
>> purchaseCost. This allows us to provide transforms that operate on field
>> names. Some example (using the Java API):
>> >> >>>
>> >> >>> PCollection users = events.apply(Select.fields("user"));  //
>> Select out only the user field.
>> >> >>>
>> >> >>> PCollection joinedEvents =
>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>> PCollections by user.
>> >> >>>
>> >> >>> // For each country, calculate the total purchase cost as well as
>> the top 10 purchases.
>> >> >>> // A new schema is created containing fields total_cost and
>> top_purchases, and rows are created with the aggregation results.
>> >> >>> PCollection purchaseStatistics = events.apply(
>> >> >>>     Group.byFieldNames("country")
>> >> >>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>> "total_cost"))
>> >> >>>                 .aggregateField("purchaseCost",
>> Top.largestLongs(10), "top_purchases"))
>> >> >>>
>> >> >>>
>> >> >>> This is far more readable than what we have today, and what
>> unlocks this is that Beam actually knows the structure of the record
>> instead of assuming records are uncrackable blobs.
>> >> >>>
>> >> >>> Note that a coder is basically a special case of a schema that has
>> a single field.
>> >> >>>
>> >> >>> In BeamJava we have a SchemaRegistry which knows how to turn user
>> types into schemas. We use reflection to analyze many user types (e.g.
>> simple POJO structs, JavaBean classes, Avro records, protocol buffers,
>> etc.) to determine the schema, however this is done only when the graph is
>> initially generated. We do use code generation (in Java we do bytecode
>> generation) to make this somewhat more efficient. I'm willing to bet that
>> the code generator you've written for structs could be very easily modified
>> for schemas instead, so it would not be wasted work if we went with schemas.
>> >> >>>
>> >> >>> One of the things I'm working on now is documenting Beam schemas.
>> They are already very powerful and useful, but since there is still nothing
>> in our documentation about them, they are not yet widely used. I expect to
>> finish draft documentation by the end of January.
>> >> >>>
>> >> >>> Reuven
>> >> >>>
>> >> >>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com>
>> wrote:
>> >> >>>>
>> >> >>>> That's an interesting idea. I must confess I don't rightly know
>> the difference between a schema and coder, but here's what I've got with a
>> bit of searching through memory and the mailing list. Please let me know if
>> I'm off track.
>> >> >>>>
>> >> >>>> As near as I can tell, a schema, as far as Beam takes it is a
>> mechanism to define what data is extracted from a given row of data. So in
>> principle, there's an opportunity to be more efficient with data with many
>> columns that aren't being used, and only extract the data that's meaningful
>> to the pipeline.
>> >> >>>> The trick then is how to apply the schema to a given
>> serialization format, which is something I'm missing in my mental model
>> (and then how to do it efficiently in Go).
>> >> >>>>
>> >> >>>> I do know that the Go client package for BigQuery does something
>> like that, using field tags. Similarly, the "encoding/json" package in the
>> Go Standard Library permits annotating fields and it will read out and
>> deserialize the JSON fields and that's it.
>> >> >>>>
>> >> >>>> A concern I have is that Go (at present) would require
>> pre-compile time code generation for schemas to be efficient, and they
>> would still mostly boil down to turning []bytes into real structs. Go
>> reflection doesn't keep up.
>> >> >>>> Go has no mechanism I'm aware of to Just In Time compile more
>> efficient processing of values.
>> >> >>>> It's also not 100% clear how Schema's would play with protocol
>> buffers or similar.
>> >> >>>> BigQuery has a mechanism of generating a JSON schema from a proto
>> file, but that's only the specification half, not the using half.
>> >> >>>>
>> >> >>>> As it stands, the code generator I've been building these last
>> months could (in principle) statically analyze a user's struct, and then
>> generate an efficient dedicated coder for it. It just has no where to put
>> them such that the Go SDK would use it.
>> >> >>>>
>> >> >>>>
>> >> >>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com>
>> wrote:
>> >> >>>>>
>> >> >>>>> I'll make a different suggestion. There's been some chatter that
>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>> schemas the basic semantics instead of coders. Schemas provide everything a
>> coder provides, but also allows for far more readable code. We can't make
>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>> we're better off starting with schemas instead of coders?
>> >> >>>>>
>> >> >>>>> Reuven
>> >> >>>>>
>> >> >>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
>> wrote:
>> >> >>>>>>
>> >> >>>>>> One area that the Go SDK currently lacks: is the ability for
>> users to specify their own coders for types.
>> >> >>>>>>
>> >> >>>>>> I've written a proposal document, and while I'm confident about
>> the core, there are certainly some edge cases that require discussion
>> before getting on with the implementation.
>> >> >>>>>>
>> >> >>>>>> At presently, the SDK only permits primitive value types (all
>> numeric types but complex, strings, and []bytes) which are coded with beam
>> coders, and structs whose exported fields are of those type, which is then
>> encoded as JSON. Protocol buffer support is hacked in to avoid the type
>> anaiyzer, and presents the current work around this issue.
>> >> >>>>>>
>> >> >>>>>> The high level proposal is to catch up with Python and Java,
>> and have a coder registry. In addition, arrays, and maps should be
>> permitted as well.
>> >> >>>>>>
>> >> >>>>>> If you have alternatives, or other suggestions and opinions,
>> I'd love to hear them! Otherwise my intent is to get a PR ready by the end
>> of January.
>> >> >>>>>>
>> >> >>>>>> Thanks!
>> >> >>>>>> Robert Burke
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> >> >>>> --
>> >> >>>> http://go/where-is-rebo
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >> Cheers,
>> >> >> Gleb
>>
>

Re: [Go SDK] User Defined Coders

Posted by Robert Burke <ro...@frantil.com>.
I've updated the design doc
<https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#>
with
a section on schemas. Interestingly, the lack of Generics in Go ends up
being very handy. No incompatibility between converting from a concrete
type, and it's Schema equivalent.
The main question on "infering" a coder can come from using them by
default, but also forcing a PCollection to be it's Schema equivalent, which
would be used.

There's then the other difficulties around the actual implementation, but
at this point I don't see user code interacting *directly* with the
recursive schema definition, but instead relying on implied conversions to
concrete types, which are easier to manipulate for the user.

To get performance, each type could have a specific a T -> Schema Schema ->
T generated for it. There are some semantics to work out there, but they
don't touch coders, so I'm not going deep into them now. In particular,
these would be necessary for converting a PCollection<Schema> to
PCollection<Concrete> for use in DoFns.

The Schema type is largely useful for the framework to manipulate types, so
in those cases, using the schema coder is obvious. For everything else, it
wouldn't be too bad to provide a ConvertToSchemaType transform from
PCollection to PCollection, which would effectively change the PCollection
to use the Schema Coding if that PCollection is being sent to a cross
language sink that requires schema coded values. Cross language sources
would already require explicitly pointing out the type to use, and for that
a user could provide an explicit Schema value (of whatever we implement it
as) to force correct decoding.

Overall, given that schemas *are not yet* in the FnAPI, and I can't
currently find any insurmountable issues between the proposal and schemas,
I'm going to start a PR for this.
Cheers,
Robert Burke

PS. I've added a link to the doc and the other Go specific ones to the
Technical/Design
Doc page of the wiki
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=95653903> to
make them easier to find.


On Wed, 9 Jan 2019 at 07:40, Robert Bradshaw <ro...@google.com> wrote:

> On Tue, Jan 8, 2019 at 9:15 PM Reuven Lax <re...@google.com> wrote:
> >
> > I wonder if we could do this _only_ over the FnApi. The FnApi already
> does batching I believe. What if we made schemas a fundamental part of our
> protos, and had no SchemaCoder.
>
> One advantage of SchemaCoders is that they allow nesting inside other
> coders.
>
> Schemas are not (I don't think) a replacement for coders at the
> implementation level, but in the user API they obviate the need for
> most users to interact with coders (as well as providing a richer,
> language-independent way of describing elements in most cases).
>
> > The FnApi could then batch up a bunch of rows an encode using Arrow
> before sending over the wire to the harness.
>
> The encoding of records across the FnAPI can be more expressive than
> Coders, regardless of schemas.
>
> > Of course this still turns back into individual records before it goes
> back to user code. However well-known combiners can be executed directly in
> the harness, which means aggregations like "sum a field" can be run inside
> the harness over the columnar data. Moving these combiners into the harness
> might itself be a huge perf benefit for Python, as we could then run them
> in a more-performant language.
>
> You don't even have to move them out of Python to take advantage of
> this if you're using the right libraries and have the right
> representation. If you do move them it doesn't have to be into the
> harness, it could be an adjacent SDK. I envision a large suite of
> known URNs that can be placed wherever it's best.
>
>
>
> On Tue, Jan 8, 2019 at 7:12 PM Kenneth Knowles <ke...@apache.org> wrote:
> >
> > And even more for SQL, or about the same since you are referring to
> DataFrames which have roughly relational semantics. Having the columnar
> model all the way to the data source would be big. Having near-zero-parsing
> cost for transmitted data would be big. These changes would make Beam a
> rather different project.
>
> I think beam would be qualitatively the same project, but it would
> open up a lot of areas for optimization (both in the "computer
> resource" sense and "easy for people to get stuff done" sense).
>
>
> > Reuven
> >
> > On Tue, Jan 8, 2019 at 7:44 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> On Tue, Jan 8, 2019 at 4:32 PM Reuven Lax <re...@google.com> wrote:
> >> >
> >> > I agree with this, but I think it's a significant rethinking of Beam
> that I didn't want to couple to schemas. In addition to rethinking the API,
> it might also require rethinking all of our runners.
> >>
> >> We're already marshaling (including batching) data over the FnApi, so
> >> it might not be that big of a change. Also, the choice of encoding
> >> over the data channel is already parametrizable via a coder, so it's
> >> easy to make this an optional feature that runners and SDKs can opt
> >> into. I agree that we don't want to couple it to schemas (though
> >> that's where it becomes even more useful).
> >>
> >> > Also while columnar can be a large perf win, I suspect that we
> currently have lower-hanging fruit to optimize when it comes to performance.
> >>
> >> It's probably a bigger win for Python than for Java.
> >>
> >> >
> >> > Reuven
> >> >
> >> > On Tue, Jan 8, 2019 at 5:25 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >>
> >> >> On Fri, Jan 4, 2019 at 12:54 AM Reuven Lax <re...@google.com> wrote:
> >> >> >
> >> >> > I looked at Apache Arrow as a potential serialization format for
> Row coders. At the time it didn't seem a perfect fit - Beam's programming
> model is record-at-a-time, and Arrow is optimized for large batches of
> records (while Beam has a concept of "bundles" they are completely non
> deterministic, and records might bundle different on retry). You could use
> Arrow with single-record batches, but I suspect that would end up adding a
> lot of extra overhead. That being said, I think it's still something worth
> investigating further.
> >> >>
> >> >> Though Beam's model is row-oriented, I think it'd make a lot of sense
> >> >> to support column-oriented transfer of data across the data plane
> >> >> (we're already concatenating serialized records lengthwise), with
> >> >> Arrow as a first candidate, and (either as part of the public API or
> >> >> as an implementation detail) columnar processing as well (e.g.
> >> >> projections, maps, filters, and aggregations can often be done more
> >> >> efficiently in a columnar fashion). While this is often a significant
> >> >> win in C++ (and presumably Java), it's essential for doing
> >> >> high-performance computing in Python (e.g. Numpy, SciPy, Pandas,
> >> >> Tensorflow, ... all have batch-oriented APIs and avoid representing
> >> >> records as individual objects, something we'll need to tackle for
> >> >> BeamPython at least).
> >> >>
> >> >> >
> >> >> > Reuven
> >> >> >
> >> >> >
> >> >> >
> >> >> > On Fri, Jan 4, 2019 at 12:34 AM Gleb Kanterov <gl...@spotify.com>
> wrote:
> >> >> >>
> >> >> >> Reuven, it sounds great. I see there is a similar thing to Row
> coders happening in Apache Arrow, and there is a similarity between Apache
> Arrow Flight and data exchange service in portability. How do you see these
> two things relate to each other in the long term?
> >> >> >>
> >> >> >> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com>
> wrote:
> >> >> >>>
> >> >> >>> The biggest advantage is actually readability and usability. A
> secondary advantage is that it means that Go will be able to interact
> seamlessly with BeamSQL, which would be a big win for Go.
> >> >> >>>
> >> >> >>> A schema is basically a way of saying that a record has a
> specific set of (possibly nested, possibly repeated) fields. So for
> instance let's say that the user's type is a struct with fields named user,
> country, purchaseCost. This allows us to provide transforms that operate on
> field names. Some example (using the Java API):
> >> >> >>>
> >> >> >>> PCollection users = events.apply(Select.fields("user"));  //
> Select out only the user field.
> >> >> >>>
> >> >> >>> PCollection joinedEvents =
> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
> PCollections by user.
> >> >> >>>
> >> >> >>> // For each country, calculate the total purchase cost as well
> as the top 10 purchases.
> >> >> >>> // A new schema is created containing fields total_cost and
> top_purchases, and rows are created with the aggregation results.
> >> >> >>> PCollection purchaseStatistics = events.apply(
> >> >> >>>     Group.byFieldNames("country")
> >> >> >>>                .aggregateField("purchaseCost", Sum.ofLongs(),
> "total_cost"))
> >> >> >>>                 .aggregateField("purchaseCost",
> Top.largestLongs(10), "top_purchases"))
> >> >> >>>
> >> >> >>>
> >> >> >>> This is far more readable than what we have today, and what
> unlocks this is that Beam actually knows the structure of the record
> instead of assuming records are uncrackable blobs.
> >> >> >>>
> >> >> >>> Note that a coder is basically a special case of a schema that
> has a single field.
> >> >> >>>
> >> >> >>> In BeamJava we have a SchemaRegistry which knows how to turn
> user types into schemas. We use reflection to analyze many user types (e.g.
> simple POJO structs, JavaBean classes, Avro records, protocol buffers,
> etc.) to determine the schema, however this is done only when the graph is
> initially generated. We do use code generation (in Java we do bytecode
> generation) to make this somewhat more efficient. I'm willing to bet that
> the code generator you've written for structs could be very easily modified
> for schemas instead, so it would not be wasted work if we went with schemas.
> >> >> >>>
> >> >> >>> One of the things I'm working on now is documenting Beam
> schemas. They are already very powerful and useful, but since there is
> still nothing in our documentation about them, they are not yet widely
> used. I expect to finish draft documentation by the end of January.
> >> >> >>>
> >> >> >>> Reuven
> >> >> >>>
> >> >> >>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com>
> wrote:
> >> >> >>>>
> >> >> >>>> That's an interesting idea. I must confess I don't rightly know
> the difference between a schema and coder, but here's what I've got with a
> bit of searching through memory and the mailing list. Please let me know if
> I'm off track.
> >> >> >>>>
> >> >> >>>> As near as I can tell, a schema, as far as Beam takes it is a
> mechanism to define what data is extracted from a given row of data. So in
> principle, there's an opportunity to be more efficient with data with many
> columns that aren't being used, and only extract the data that's meaningful
> to the pipeline.
> >> >> >>>> The trick then is how to apply the schema to a given
> serialization format, which is something I'm missing in my mental model
> (and then how to do it efficiently in Go).
> >> >> >>>>
> >> >> >>>> I do know that the Go client package for BigQuery does
> something like that, using field tags. Similarly, the "encoding/json"
> package in the Go Standard Library permits annotating fields and it will
> read out and deserialize the JSON fields and that's it.
> >> >> >>>>
> >> >> >>>> A concern I have is that Go (at present) would require
> pre-compile time code generation for schemas to be efficient, and they
> would still mostly boil down to turning []bytes into real structs. Go
> reflection doesn't keep up.
> >> >> >>>> Go has no mechanism I'm aware of to Just In Time compile more
> efficient processing of values.
> >> >> >>>> It's also not 100% clear how Schema's would play with protocol
> buffers or similar.
> >> >> >>>> BigQuery has a mechanism of generating a JSON schema from a
> proto file, but that's only the specification half, not the using half.
> >> >> >>>>
> >> >> >>>> As it stands, the code generator I've been building these last
> months could (in principle) statically analyze a user's struct, and then
> generate an efficient dedicated coder for it. It just has no where to put
> them such that the Go SDK would use it.
> >> >> >>>>
> >> >> >>>>
> >> >> >>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com>
> wrote:
> >> >> >>>>>
> >> >> >>>>> I'll make a different suggestion. There's been some chatter
> that schemas are a better tool than coders, and that in Beam 3.0 we should
> make schemas the basic semantics instead of coders. Schemas provide
> everything a coder provides, but also allows for far more readable code. We
> can't make such a change in Beam Java 2.X for compatibility reasons, but
> maybe in Go we're better off starting with schemas instead of coders?
> >> >> >>>>>
> >> >> >>>>> Reuven
> >> >> >>>>>
> >> >> >>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <
> robert@frantil.com> wrote:
> >> >> >>>>>>
> >> >> >>>>>> One area that the Go SDK currently lacks: is the ability for
> users to specify their own coders for types.
> >> >> >>>>>>
> >> >> >>>>>> I've written a proposal document, and while I'm confident
> about the core, there are certainly some edge cases that require discussion
> before getting on with the implementation.
> >> >> >>>>>>
> >> >> >>>>>> At presently, the SDK only permits primitive value types (all
> numeric types but complex, strings, and []bytes) which are coded with beam
> coders, and structs whose exported fields are of those type, which is then
> encoded as JSON. Protocol buffer support is hacked in to avoid the type
> anaiyzer, and presents the current work around this issue.
> >> >> >>>>>>
> >> >> >>>>>> The high level proposal is to catch up with Python and Java,
> and have a coder registry. In addition, arrays, and maps should be
> permitted as well.
> >> >> >>>>>>
> >> >> >>>>>> If you have alternatives, or other suggestions and opinions,
> I'd love to hear them! Otherwise my intent is to get a PR ready by the end
> of January.
> >> >> >>>>>>
> >> >> >>>>>> Thanks!
> >> >> >>>>>> Robert Burke
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> >> >> >>>> --
> >> >> >>>> http://go/where-is-rebo
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> --
> >> >> >> Cheers,
> >> >> >> Gleb
>

Re: [Go SDK] User Defined Coders

Posted by Robert Bradshaw <ro...@google.com>.
On Tue, Jan 8, 2019 at 9:15 PM Reuven Lax <re...@google.com> wrote:
>
> I wonder if we could do this _only_ over the FnApi. The FnApi already does batching I believe. What if we made schemas a fundamental part of our protos, and had no SchemaCoder.

One advantage of SchemaCoders is that they allow nesting inside other coders.

Schemas are not (I don't think) a replacement for coders at the
implementation level, but in the user API they obviate the need for
most users to interact with coders (as well as providing a richer,
language-independent way of describing elements in most cases).

> The FnApi could then batch up a bunch of rows an encode using Arrow before sending over the wire to the harness.

The encoding of records across the FnAPI can be more expressive than
Coders, regardless of schemas.

> Of course this still turns back into individual records before it goes back to user code. However well-known combiners can be executed directly in the harness, which means aggregations like "sum a field" can be run inside the harness over the columnar data. Moving these combiners into the harness might itself be a huge perf benefit for Python, as we could then run them in a more-performant language.

You don't even have to move them out of Python to take advantage of
this if you're using the right libraries and have the right
representation. If you do move them it doesn't have to be into the
harness, it could be an adjacent SDK. I envision a large suite of
known URNs that can be placed wherever it's best.



On Tue, Jan 8, 2019 at 7:12 PM Kenneth Knowles <ke...@apache.org> wrote:
>
> And even more for SQL, or about the same since you are referring to DataFrames which have roughly relational semantics. Having the columnar model all the way to the data source would be big. Having near-zero-parsing cost for transmitted data would be big. These changes would make Beam a rather different project.

I think beam would be qualitatively the same project, but it would
open up a lot of areas for optimization (both in the "computer
resource" sense and "easy for people to get stuff done" sense).


> Reuven
>
> On Tue, Jan 8, 2019 at 7:44 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> On Tue, Jan 8, 2019 at 4:32 PM Reuven Lax <re...@google.com> wrote:
>> >
>> > I agree with this, but I think it's a significant rethinking of Beam that I didn't want to couple to schemas. In addition to rethinking the API, it might also require rethinking all of our runners.
>>
>> We're already marshaling (including batching) data over the FnApi, so
>> it might not be that big of a change. Also, the choice of encoding
>> over the data channel is already parametrizable via a coder, so it's
>> easy to make this an optional feature that runners and SDKs can opt
>> into. I agree that we don't want to couple it to schemas (though
>> that's where it becomes even more useful).
>>
>> > Also while columnar can be a large perf win, I suspect that we currently have lower-hanging fruit to optimize when it comes to performance.
>>
>> It's probably a bigger win for Python than for Java.
>>
>> >
>> > Reuven
>> >
>> > On Tue, Jan 8, 2019 at 5:25 AM Robert Bradshaw <ro...@google.com> wrote:
>> >>
>> >> On Fri, Jan 4, 2019 at 12:54 AM Reuven Lax <re...@google.com> wrote:
>> >> >
>> >> > I looked at Apache Arrow as a potential serialization format for Row coders. At the time it didn't seem a perfect fit - Beam's programming model is record-at-a-time, and Arrow is optimized for large batches of records (while Beam has a concept of "bundles" they are completely non deterministic, and records might bundle different on retry). You could use Arrow with single-record batches, but I suspect that would end up adding a lot of extra overhead. That being said, I think it's still something worth investigating further.
>> >>
>> >> Though Beam's model is row-oriented, I think it'd make a lot of sense
>> >> to support column-oriented transfer of data across the data plane
>> >> (we're already concatenating serialized records lengthwise), with
>> >> Arrow as a first candidate, and (either as part of the public API or
>> >> as an implementation detail) columnar processing as well (e.g.
>> >> projections, maps, filters, and aggregations can often be done more
>> >> efficiently in a columnar fashion). While this is often a significant
>> >> win in C++ (and presumably Java), it's essential for doing
>> >> high-performance computing in Python (e.g. Numpy, SciPy, Pandas,
>> >> Tensorflow, ... all have batch-oriented APIs and avoid representing
>> >> records as individual objects, something we'll need to tackle for
>> >> BeamPython at least).
>> >>
>> >> >
>> >> > Reuven
>> >> >
>> >> >
>> >> >
>> >> > On Fri, Jan 4, 2019 at 12:34 AM Gleb Kanterov <gl...@spotify.com> wrote:
>> >> >>
>> >> >> Reuven, it sounds great. I see there is a similar thing to Row coders happening in Apache Arrow, and there is a similarity between Apache Arrow Flight and data exchange service in portability. How do you see these two things relate to each other in the long term?
>> >> >>
>> >> >> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>> >> >>>
>> >> >>> The biggest advantage is actually readability and usability. A secondary advantage is that it means that Go will be able to interact seamlessly with BeamSQL, which would be a big win for Go.
>> >> >>>
>> >> >>> A schema is basically a way of saying that a record has a specific set of (possibly nested, possibly repeated) fields. So for instance let's say that the user's type is a struct with fields named user, country, purchaseCost. This allows us to provide transforms that operate on field names. Some example (using the Java API):
>> >> >>>
>> >> >>> PCollection users = events.apply(Select.fields("user"));  // Select out only the user field.
>> >> >>>
>> >> >>> PCollection joinedEvents = queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two PCollections by user.
>> >> >>>
>> >> >>> // For each country, calculate the total purchase cost as well as the top 10 purchases.
>> >> >>> // A new schema is created containing fields total_cost and top_purchases, and rows are created with the aggregation results.
>> >> >>> PCollection purchaseStatistics = events.apply(
>> >> >>>     Group.byFieldNames("country")
>> >> >>>                .aggregateField("purchaseCost", Sum.ofLongs(), "total_cost"))
>> >> >>>                 .aggregateField("purchaseCost", Top.largestLongs(10), "top_purchases"))
>> >> >>>
>> >> >>>
>> >> >>> This is far more readable than what we have today, and what unlocks this is that Beam actually knows the structure of the record instead of assuming records are uncrackable blobs.
>> >> >>>
>> >> >>> Note that a coder is basically a special case of a schema that has a single field.
>> >> >>>
>> >> >>> In BeamJava we have a SchemaRegistry which knows how to turn user types into schemas. We use reflection to analyze many user types (e.g. simple POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to determine the schema, however this is done only when the graph is initially generated. We do use code generation (in Java we do bytecode generation) to make this somewhat more efficient. I'm willing to bet that the code generator you've written for structs could be very easily modified for schemas instead, so it would not be wasted work if we went with schemas.
>> >> >>>
>> >> >>> One of the things I'm working on now is documenting Beam schemas. They are already very powerful and useful, but since there is still nothing in our documentation about them, they are not yet widely used. I expect to finish draft documentation by the end of January.
>> >> >>>
>> >> >>> Reuven
>> >> >>>
>> >> >>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>> >> >>>>
>> >> >>>> That's an interesting idea. I must confess I don't rightly know the difference between a schema and coder, but here's what I've got with a bit of searching through memory and the mailing list. Please let me know if I'm off track.
>> >> >>>>
>> >> >>>> As near as I can tell, a schema, as far as Beam takes it is a mechanism to define what data is extracted from a given row of data. So in principle, there's an opportunity to be more efficient with data with many columns that aren't being used, and only extract the data that's meaningful to the pipeline.
>> >> >>>> The trick then is how to apply the schema to a given serialization format, which is something I'm missing in my mental model (and then how to do it efficiently in Go).
>> >> >>>>
>> >> >>>> I do know that the Go client package for BigQuery does something like that, using field tags. Similarly, the "encoding/json" package in the Go Standard Library permits annotating fields and it will read out and deserialize the JSON fields and that's it.
>> >> >>>>
>> >> >>>> A concern I have is that Go (at present) would require pre-compile time code generation for schemas to be efficient, and they would still mostly boil down to turning []bytes into real structs. Go reflection doesn't keep up.
>> >> >>>> Go has no mechanism I'm aware of to Just In Time compile more efficient processing of values.
>> >> >>>> It's also not 100% clear how Schema's would play with protocol buffers or similar.
>> >> >>>> BigQuery has a mechanism of generating a JSON schema from a proto file, but that's only the specification half, not the using half.
>> >> >>>>
>> >> >>>> As it stands, the code generator I've been building these last months could (in principle) statically analyze a user's struct, and then generate an efficient dedicated coder for it. It just has no where to put them such that the Go SDK would use it.
>> >> >>>>
>> >> >>>>
>> >> >>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>> >> >>>>>
>> >> >>>>> I'll make a different suggestion. There's been some chatter that schemas are a better tool than coders, and that in Beam 3.0 we should make schemas the basic semantics instead of coders. Schemas provide everything a coder provides, but also allows for far more readable code. We can't make such a change in Beam Java 2.X for compatibility reasons, but maybe in Go we're better off starting with schemas instead of coders?
>> >> >>>>>
>> >> >>>>> Reuven
>> >> >>>>>
>> >> >>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com> wrote:
>> >> >>>>>>
>> >> >>>>>> One area that the Go SDK currently lacks: is the ability for users to specify their own coders for types.
>> >> >>>>>>
>> >> >>>>>> I've written a proposal document, and while I'm confident about the core, there are certainly some edge cases that require discussion before getting on with the implementation.
>> >> >>>>>>
>> >> >>>>>> At presently, the SDK only permits primitive value types (all numeric types but complex, strings, and []bytes) which are coded with beam coders, and structs whose exported fields are of those type, which is then encoded as JSON. Protocol buffer support is hacked in to avoid the type anaiyzer, and presents the current work around this issue.
>> >> >>>>>>
>> >> >>>>>> The high level proposal is to catch up with Python and Java, and have a coder registry. In addition, arrays, and maps should be permitted as well.
>> >> >>>>>>
>> >> >>>>>> If you have alternatives, or other suggestions and opinions, I'd love to hear them! Otherwise my intent is to get a PR ready by the end of January.
>> >> >>>>>>
>> >> >>>>>> Thanks!
>> >> >>>>>> Robert Burke
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> >> >>>> --
>> >> >>>> http://go/where-is-rebo
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >> Cheers,
>> >> >> Gleb

Re: [Go SDK] User Defined Coders

Posted by Reuven Lax <re...@google.com>.
I wonder if we could do this _only_ over the FnApi. The FnApi already does
batching I believe. What if we made schemas a fundamental part of our
protos, and had no SchemaCoder. The FnApi could then batch up a bunch of
rows an encode using Arrow before sending over the wire to the harness.

Of course this still turns back into individual records before it goes back
to user code. However well-known combiners can be executed directly in the
harness, which means aggregations like "sum a field" can be run inside the
harness over the columnar data. Moving these combiners into the harness
might itself be a huge perf benefit for Python, as we could then run them
in a more-performant language.

Reuven

On Tue, Jan 8, 2019 at 7:44 AM Robert Bradshaw <ro...@google.com> wrote:

> On Tue, Jan 8, 2019 at 4:32 PM Reuven Lax <re...@google.com> wrote:
> >
> > I agree with this, but I think it's a significant rethinking of Beam
> that I didn't want to couple to schemas. In addition to rethinking the API,
> it might also require rethinking all of our runners.
>
> We're already marshaling (including batching) data over the FnApi, so
> it might not be that big of a change. Also, the choice of encoding
> over the data channel is already parametrizable via a coder, so it's
> easy to make this an optional feature that runners and SDKs can opt
> into. I agree that we don't want to couple it to schemas (though
> that's where it becomes even more useful).
>
> > Also while columnar can be a large perf win, I suspect that we currently
> have lower-hanging fruit to optimize when it comes to performance.
>
> It's probably a bigger win for Python than for Java.
>
> >
> > Reuven
> >
> > On Tue, Jan 8, 2019 at 5:25 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> On Fri, Jan 4, 2019 at 12:54 AM Reuven Lax <re...@google.com> wrote:
> >> >
> >> > I looked at Apache Arrow as a potential serialization format for Row
> coders. At the time it didn't seem a perfect fit - Beam's programming model
> is record-at-a-time, and Arrow is optimized for large batches of records
> (while Beam has a concept of "bundles" they are completely non
> deterministic, and records might bundle different on retry). You could use
> Arrow with single-record batches, but I suspect that would end up adding a
> lot of extra overhead. That being said, I think it's still something worth
> investigating further.
> >>
> >> Though Beam's model is row-oriented, I think it'd make a lot of sense
> >> to support column-oriented transfer of data across the data plane
> >> (we're already concatenating serialized records lengthwise), with
> >> Arrow as a first candidate, and (either as part of the public API or
> >> as an implementation detail) columnar processing as well (e.g.
> >> projections, maps, filters, and aggregations can often be done more
> >> efficiently in a columnar fashion). While this is often a significant
> >> win in C++ (and presumably Java), it's essential for doing
> >> high-performance computing in Python (e.g. Numpy, SciPy, Pandas,
> >> Tensorflow, ... all have batch-oriented APIs and avoid representing
> >> records as individual objects, something we'll need to tackle for
> >> BeamPython at least).
> >>
> >> >
> >> > Reuven
> >> >
> >> >
> >> >
> >> > On Fri, Jan 4, 2019 at 12:34 AM Gleb Kanterov <gl...@spotify.com>
> wrote:
> >> >>
> >> >> Reuven, it sounds great. I see there is a similar thing to Row
> coders happening in Apache Arrow, and there is a similarity between Apache
> Arrow Flight and data exchange service in portability. How do you see these
> two things relate to each other in the long term?
> >> >>
> >> >> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
> >> >>>
> >> >>> The biggest advantage is actually readability and usability. A
> secondary advantage is that it means that Go will be able to interact
> seamlessly with BeamSQL, which would be a big win for Go.
> >> >>>
> >> >>> A schema is basically a way of saying that a record has a specific
> set of (possibly nested, possibly repeated) fields. So for instance let's
> say that the user's type is a struct with fields named user, country,
> purchaseCost. This allows us to provide transforms that operate on field
> names. Some example (using the Java API):
> >> >>>
> >> >>> PCollection users = events.apply(Select.fields("user"));  // Select
> out only the user field.
> >> >>>
> >> >>> PCollection joinedEvents =
> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
> PCollections by user.
> >> >>>
> >> >>> // For each country, calculate the total purchase cost as well as
> the top 10 purchases.
> >> >>> // A new schema is created containing fields total_cost and
> top_purchases, and rows are created with the aggregation results.
> >> >>> PCollection purchaseStatistics = events.apply(
> >> >>>     Group.byFieldNames("country")
> >> >>>                .aggregateField("purchaseCost", Sum.ofLongs(),
> "total_cost"))
> >> >>>                 .aggregateField("purchaseCost",
> Top.largestLongs(10), "top_purchases"))
> >> >>>
> >> >>>
> >> >>> This is far more readable than what we have today, and what unlocks
> this is that Beam actually knows the structure of the record instead of
> assuming records are uncrackable blobs.
> >> >>>
> >> >>> Note that a coder is basically a special case of a schema that has
> a single field.
> >> >>>
> >> >>> In BeamJava we have a SchemaRegistry which knows how to turn user
> types into schemas. We use reflection to analyze many user types (e.g.
> simple POJO structs, JavaBean classes, Avro records, protocol buffers,
> etc.) to determine the schema, however this is done only when the graph is
> initially generated. We do use code generation (in Java we do bytecode
> generation) to make this somewhat more efficient. I'm willing to bet that
> the code generator you've written for structs could be very easily modified
> for schemas instead, so it would not be wasted work if we went with schemas.
> >> >>>
> >> >>> One of the things I'm working on now is documenting Beam schemas.
> They are already very powerful and useful, but since there is still nothing
> in our documentation about them, they are not yet widely used. I expect to
> finish draft documentation by the end of January.
> >> >>>
> >> >>> Reuven
> >> >>>
> >> >>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com>
> wrote:
> >> >>>>
> >> >>>> That's an interesting idea. I must confess I don't rightly know
> the difference between a schema and coder, but here's what I've got with a
> bit of searching through memory and the mailing list. Please let me know if
> I'm off track.
> >> >>>>
> >> >>>> As near as I can tell, a schema, as far as Beam takes it is a
> mechanism to define what data is extracted from a given row of data. So in
> principle, there's an opportunity to be more efficient with data with many
> columns that aren't being used, and only extract the data that's meaningful
> to the pipeline.
> >> >>>> The trick then is how to apply the schema to a given serialization
> format, which is something I'm missing in my mental model (and then how to
> do it efficiently in Go).
> >> >>>>
> >> >>>> I do know that the Go client package for BigQuery does something
> like that, using field tags. Similarly, the "encoding/json" package in the
> Go Standard Library permits annotating fields and it will read out and
> deserialize the JSON fields and that's it.
> >> >>>>
> >> >>>> A concern I have is that Go (at present) would require pre-compile
> time code generation for schemas to be efficient, and they would still
> mostly boil down to turning []bytes into real structs. Go reflection
> doesn't keep up.
> >> >>>> Go has no mechanism I'm aware of to Just In Time compile more
> efficient processing of values.
> >> >>>> It's also not 100% clear how Schema's would play with protocol
> buffers or similar.
> >> >>>> BigQuery has a mechanism of generating a JSON schema from a proto
> file, but that's only the specification half, not the using half.
> >> >>>>
> >> >>>> As it stands, the code generator I've been building these last
> months could (in principle) statically analyze a user's struct, and then
> generate an efficient dedicated coder for it. It just has no where to put
> them such that the Go SDK would use it.
> >> >>>>
> >> >>>>
> >> >>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com>
> wrote:
> >> >>>>>
> >> >>>>> I'll make a different suggestion. There's been some chatter that
> schemas are a better tool than coders, and that in Beam 3.0 we should make
> schemas the basic semantics instead of coders. Schemas provide everything a
> coder provides, but also allows for far more readable code. We can't make
> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
> we're better off starting with schemas instead of coders?
> >> >>>>>
> >> >>>>> Reuven
> >> >>>>>
> >> >>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
> wrote:
> >> >>>>>>
> >> >>>>>> One area that the Go SDK currently lacks: is the ability for
> users to specify their own coders for types.
> >> >>>>>>
> >> >>>>>> I've written a proposal document, and while I'm confident about
> the core, there are certainly some edge cases that require discussion
> before getting on with the implementation.
> >> >>>>>>
> >> >>>>>> At presently, the SDK only permits primitive value types (all
> numeric types but complex, strings, and []bytes) which are coded with beam
> coders, and structs whose exported fields are of those type, which is then
> encoded as JSON. Protocol buffer support is hacked in to avoid the type
> anaiyzer, and presents the current work around this issue.
> >> >>>>>>
> >> >>>>>> The high level proposal is to catch up with Python and Java, and
> have a coder registry. In addition, arrays, and maps should be permitted as
> well.
> >> >>>>>>
> >> >>>>>> If you have alternatives, or other suggestions and opinions, I'd
> love to hear them! Otherwise my intent is to get a PR ready by the end of
> January.
> >> >>>>>>
> >> >>>>>> Thanks!
> >> >>>>>> Robert Burke
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> --
> >> >>>> http://go/where-is-rebo
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Cheers,
> >> >> Gleb
>

Re: [Go SDK] User Defined Coders

Posted by Kenneth Knowles <ke...@apache.org>.
On Tue, Jan 8, 2019 at 7:44 AM Robert Bradshaw <ro...@google.com> wrote:

> On Tue, Jan 8, 2019 at 4:32 PM Reuven Lax <re...@google.com> wrote:
> >
> > Also while columnar can be a large perf win, I suspect that we currently
> have lower-hanging fruit to optimize when it comes to performance.
>
> It's probably a bigger win for Python than for Java.
>

And even more for SQL, or about the same since you are referring to
DataFrames which have roughly relational semantics. Having the columnar
model all the way to the data source would be big. Having near-zero-parsing
cost for transmitted data would be big. These changes would make Beam a
rather different project.

Kenn


>
> >
> > Reuven
> >
> > On Tue, Jan 8, 2019 at 5:25 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> On Fri, Jan 4, 2019 at 12:54 AM Reuven Lax <re...@google.com> wrote:
> >> >
> >> > I looked at Apache Arrow as a potential serialization format for Row
> coders. At the time it didn't seem a perfect fit - Beam's programming model
> is record-at-a-time, and Arrow is optimized for large batches of records
> (while Beam has a concept of "bundles" they are completely non
> deterministic, and records might bundle different on retry). You could use
> Arrow with single-record batches, but I suspect that would end up adding a
> lot of extra overhead. That being said, I think it's still something worth
> investigating further.
> >>
> >> Though Beam's model is row-oriented, I think it'd make a lot of sense
> >> to support column-oriented transfer of data across the data plane
> >> (we're already concatenating serialized records lengthwise), with
> >> Arrow as a first candidate, and (either as part of the public API or
> >> as an implementation detail) columnar processing as well (e.g.
> >> projections, maps, filters, and aggregations can often be done more
> >> efficiently in a columnar fashion). While this is often a significant
> >> win in C++ (and presumably Java), it's essential for doing
> >> high-performance computing in Python (e.g. Numpy, SciPy, Pandas,
> >> Tensorflow, ... all have batch-oriented APIs and avoid representing
> >> records as individual objects, something we'll need to tackle for
> >> BeamPython at least).
> >>
> >> >
> >> > Reuven
> >> >
> >> >
> >> >
> >> > On Fri, Jan 4, 2019 at 12:34 AM Gleb Kanterov <gl...@spotify.com>
> wrote:
> >> >>
> >> >> Reuven, it sounds great. I see there is a similar thing to Row
> coders happening in Apache Arrow, and there is a similarity between Apache
> Arrow Flight and data exchange service in portability. How do you see these
> two things relate to each other in the long term?
> >> >>
> >> >> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
> >> >>>
> >> >>> The biggest advantage is actually readability and usability. A
> secondary advantage is that it means that Go will be able to interact
> seamlessly with BeamSQL, which would be a big win for Go.
> >> >>>
> >> >>> A schema is basically a way of saying that a record has a specific
> set of (possibly nested, possibly repeated) fields. So for instance let's
> say that the user's type is a struct with fields named user, country,
> purchaseCost. This allows us to provide transforms that operate on field
> names. Some example (using the Java API):
> >> >>>
> >> >>> PCollection users = events.apply(Select.fields("user"));  // Select
> out only the user field.
> >> >>>
> >> >>> PCollection joinedEvents =
> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
> PCollections by user.
> >> >>>
> >> >>> // For each country, calculate the total purchase cost as well as
> the top 10 purchases.
> >> >>> // A new schema is created containing fields total_cost and
> top_purchases, and rows are created with the aggregation results.
> >> >>> PCollection purchaseStatistics = events.apply(
> >> >>>     Group.byFieldNames("country")
> >> >>>                .aggregateField("purchaseCost", Sum.ofLongs(),
> "total_cost"))
> >> >>>                 .aggregateField("purchaseCost",
> Top.largestLongs(10), "top_purchases"))
> >> >>>
> >> >>>
> >> >>> This is far more readable than what we have today, and what unlocks
> this is that Beam actually knows the structure of the record instead of
> assuming records are uncrackable blobs.
> >> >>>
> >> >>> Note that a coder is basically a special case of a schema that has
> a single field.
> >> >>>
> >> >>> In BeamJava we have a SchemaRegistry which knows how to turn user
> types into schemas. We use reflection to analyze many user types (e.g.
> simple POJO structs, JavaBean classes, Avro records, protocol buffers,
> etc.) to determine the schema, however this is done only when the graph is
> initially generated. We do use code generation (in Java we do bytecode
> generation) to make this somewhat more efficient. I'm willing to bet that
> the code generator you've written for structs could be very easily modified
> for schemas instead, so it would not be wasted work if we went with schemas.
> >> >>>
> >> >>> One of the things I'm working on now is documenting Beam schemas.
> They are already very powerful and useful, but since there is still nothing
> in our documentation about them, they are not yet widely used. I expect to
> finish draft documentation by the end of January.
> >> >>>
> >> >>> Reuven
> >> >>>
> >> >>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com>
> wrote:
> >> >>>>
> >> >>>> That's an interesting idea. I must confess I don't rightly know
> the difference between a schema and coder, but here's what I've got with a
> bit of searching through memory and the mailing list. Please let me know if
> I'm off track.
> >> >>>>
> >> >>>> As near as I can tell, a schema, as far as Beam takes it is a
> mechanism to define what data is extracted from a given row of data. So in
> principle, there's an opportunity to be more efficient with data with many
> columns that aren't being used, and only extract the data that's meaningful
> to the pipeline.
> >> >>>> The trick then is how to apply the schema to a given serialization
> format, which is something I'm missing in my mental model (and then how to
> do it efficiently in Go).
> >> >>>>
> >> >>>> I do know that the Go client package for BigQuery does something
> like that, using field tags. Similarly, the "encoding/json" package in the
> Go Standard Library permits annotating fields and it will read out and
> deserialize the JSON fields and that's it.
> >> >>>>
> >> >>>> A concern I have is that Go (at present) would require pre-compile
> time code generation for schemas to be efficient, and they would still
> mostly boil down to turning []bytes into real structs. Go reflection
> doesn't keep up.
> >> >>>> Go has no mechanism I'm aware of to Just In Time compile more
> efficient processing of values.
> >> >>>> It's also not 100% clear how Schema's would play with protocol
> buffers or similar.
> >> >>>> BigQuery has a mechanism of generating a JSON schema from a proto
> file, but that's only the specification half, not the using half.
> >> >>>>
> >> >>>> As it stands, the code generator I've been building these last
> months could (in principle) statically analyze a user's struct, and then
> generate an efficient dedicated coder for it. It just has no where to put
> them such that the Go SDK would use it.
> >> >>>>
> >> >>>>
> >> >>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com>
> wrote:
> >> >>>>>
> >> >>>>> I'll make a different suggestion. There's been some chatter that
> schemas are a better tool than coders, and that in Beam 3.0 we should make
> schemas the basic semantics instead of coders. Schemas provide everything a
> coder provides, but also allows for far more readable code. We can't make
> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
> we're better off starting with schemas instead of coders?
> >> >>>>>
> >> >>>>> Reuven
> >> >>>>>
> >> >>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
> wrote:
> >> >>>>>>
> >> >>>>>> One area that the Go SDK currently lacks: is the ability for
> users to specify their own coders for types.
> >> >>>>>>
> >> >>>>>> I've written a proposal document, and while I'm confident about
> the core, there are certainly some edge cases that require discussion
> before getting on with the implementation.
> >> >>>>>>
> >> >>>>>> At presently, the SDK only permits primitive value types (all
> numeric types but complex, strings, and []bytes) which are coded with beam
> coders, and structs whose exported fields are of those type, which is then
> encoded as JSON. Protocol buffer support is hacked in to avoid the type
> anaiyzer, and presents the current work around this issue.
> >> >>>>>>
> >> >>>>>> The high level proposal is to catch up with Python and Java, and
> have a coder registry. In addition, arrays, and maps should be permitted as
> well.
> >> >>>>>>
> >> >>>>>> If you have alternatives, or other suggestions and opinions, I'd
> love to hear them! Otherwise my intent is to get a PR ready by the end of
> January.
> >> >>>>>>
> >> >>>>>> Thanks!
> >> >>>>>> Robert Burke
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> --
> >> >>>> http://go/where-is-rebo
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Cheers,
> >> >> Gleb
>

Re: [Go SDK] User Defined Coders

Posted by Robert Bradshaw <ro...@google.com>.
On Tue, Jan 8, 2019 at 4:32 PM Reuven Lax <re...@google.com> wrote:
>
> I agree with this, but I think it's a significant rethinking of Beam that I didn't want to couple to schemas. In addition to rethinking the API, it might also require rethinking all of our runners.

We're already marshaling (including batching) data over the FnApi, so
it might not be that big of a change. Also, the choice of encoding
over the data channel is already parametrizable via a coder, so it's
easy to make this an optional feature that runners and SDKs can opt
into. I agree that we don't want to couple it to schemas (though
that's where it becomes even more useful).

> Also while columnar can be a large perf win, I suspect that we currently have lower-hanging fruit to optimize when it comes to performance.

It's probably a bigger win for Python than for Java.

>
> Reuven
>
> On Tue, Jan 8, 2019 at 5:25 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> On Fri, Jan 4, 2019 at 12:54 AM Reuven Lax <re...@google.com> wrote:
>> >
>> > I looked at Apache Arrow as a potential serialization format for Row coders. At the time it didn't seem a perfect fit - Beam's programming model is record-at-a-time, and Arrow is optimized for large batches of records (while Beam has a concept of "bundles" they are completely non deterministic, and records might bundle different on retry). You could use Arrow with single-record batches, but I suspect that would end up adding a lot of extra overhead. That being said, I think it's still something worth investigating further.
>>
>> Though Beam's model is row-oriented, I think it'd make a lot of sense
>> to support column-oriented transfer of data across the data plane
>> (we're already concatenating serialized records lengthwise), with
>> Arrow as a first candidate, and (either as part of the public API or
>> as an implementation detail) columnar processing as well (e.g.
>> projections, maps, filters, and aggregations can often be done more
>> efficiently in a columnar fashion). While this is often a significant
>> win in C++ (and presumably Java), it's essential for doing
>> high-performance computing in Python (e.g. Numpy, SciPy, Pandas,
>> Tensorflow, ... all have batch-oriented APIs and avoid representing
>> records as individual objects, something we'll need to tackle for
>> BeamPython at least).
>>
>> >
>> > Reuven
>> >
>> >
>> >
>> > On Fri, Jan 4, 2019 at 12:34 AM Gleb Kanterov <gl...@spotify.com> wrote:
>> >>
>> >> Reuven, it sounds great. I see there is a similar thing to Row coders happening in Apache Arrow, and there is a similarity between Apache Arrow Flight and data exchange service in portability. How do you see these two things relate to each other in the long term?
>> >>
>> >> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>> >>>
>> >>> The biggest advantage is actually readability and usability. A secondary advantage is that it means that Go will be able to interact seamlessly with BeamSQL, which would be a big win for Go.
>> >>>
>> >>> A schema is basically a way of saying that a record has a specific set of (possibly nested, possibly repeated) fields. So for instance let's say that the user's type is a struct with fields named user, country, purchaseCost. This allows us to provide transforms that operate on field names. Some example (using the Java API):
>> >>>
>> >>> PCollection users = events.apply(Select.fields("user"));  // Select out only the user field.
>> >>>
>> >>> PCollection joinedEvents = queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two PCollections by user.
>> >>>
>> >>> // For each country, calculate the total purchase cost as well as the top 10 purchases.
>> >>> // A new schema is created containing fields total_cost and top_purchases, and rows are created with the aggregation results.
>> >>> PCollection purchaseStatistics = events.apply(
>> >>>     Group.byFieldNames("country")
>> >>>                .aggregateField("purchaseCost", Sum.ofLongs(), "total_cost"))
>> >>>                 .aggregateField("purchaseCost", Top.largestLongs(10), "top_purchases"))
>> >>>
>> >>>
>> >>> This is far more readable than what we have today, and what unlocks this is that Beam actually knows the structure of the record instead of assuming records are uncrackable blobs.
>> >>>
>> >>> Note that a coder is basically a special case of a schema that has a single field.
>> >>>
>> >>> In BeamJava we have a SchemaRegistry which knows how to turn user types into schemas. We use reflection to analyze many user types (e.g. simple POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to determine the schema, however this is done only when the graph is initially generated. We do use code generation (in Java we do bytecode generation) to make this somewhat more efficient. I'm willing to bet that the code generator you've written for structs could be very easily modified for schemas instead, so it would not be wasted work if we went with schemas.
>> >>>
>> >>> One of the things I'm working on now is documenting Beam schemas. They are already very powerful and useful, but since there is still nothing in our documentation about them, they are not yet widely used. I expect to finish draft documentation by the end of January.
>> >>>
>> >>> Reuven
>> >>>
>> >>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>> >>>>
>> >>>> That's an interesting idea. I must confess I don't rightly know the difference between a schema and coder, but here's what I've got with a bit of searching through memory and the mailing list. Please let me know if I'm off track.
>> >>>>
>> >>>> As near as I can tell, a schema, as far as Beam takes it is a mechanism to define what data is extracted from a given row of data. So in principle, there's an opportunity to be more efficient with data with many columns that aren't being used, and only extract the data that's meaningful to the pipeline.
>> >>>> The trick then is how to apply the schema to a given serialization format, which is something I'm missing in my mental model (and then how to do it efficiently in Go).
>> >>>>
>> >>>> I do know that the Go client package for BigQuery does something like that, using field tags. Similarly, the "encoding/json" package in the Go Standard Library permits annotating fields and it will read out and deserialize the JSON fields and that's it.
>> >>>>
>> >>>> A concern I have is that Go (at present) would require pre-compile time code generation for schemas to be efficient, and they would still mostly boil down to turning []bytes into real structs. Go reflection doesn't keep up.
>> >>>> Go has no mechanism I'm aware of to Just In Time compile more efficient processing of values.
>> >>>> It's also not 100% clear how Schema's would play with protocol buffers or similar.
>> >>>> BigQuery has a mechanism of generating a JSON schema from a proto file, but that's only the specification half, not the using half.
>> >>>>
>> >>>> As it stands, the code generator I've been building these last months could (in principle) statically analyze a user's struct, and then generate an efficient dedicated coder for it. It just has no where to put them such that the Go SDK would use it.
>> >>>>
>> >>>>
>> >>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>> >>>>>
>> >>>>> I'll make a different suggestion. There's been some chatter that schemas are a better tool than coders, and that in Beam 3.0 we should make schemas the basic semantics instead of coders. Schemas provide everything a coder provides, but also allows for far more readable code. We can't make such a change in Beam Java 2.X for compatibility reasons, but maybe in Go we're better off starting with schemas instead of coders?
>> >>>>>
>> >>>>> Reuven
>> >>>>>
>> >>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com> wrote:
>> >>>>>>
>> >>>>>> One area that the Go SDK currently lacks: is the ability for users to specify their own coders for types.
>> >>>>>>
>> >>>>>> I've written a proposal document, and while I'm confident about the core, there are certainly some edge cases that require discussion before getting on with the implementation.
>> >>>>>>
>> >>>>>> At presently, the SDK only permits primitive value types (all numeric types but complex, strings, and []bytes) which are coded with beam coders, and structs whose exported fields are of those type, which is then encoded as JSON. Protocol buffer support is hacked in to avoid the type anaiyzer, and presents the current work around this issue.
>> >>>>>>
>> >>>>>> The high level proposal is to catch up with Python and Java, and have a coder registry. In addition, arrays, and maps should be permitted as well.
>> >>>>>>
>> >>>>>> If you have alternatives, or other suggestions and opinions, I'd love to hear them! Otherwise my intent is to get a PR ready by the end of January.
>> >>>>>>
>> >>>>>> Thanks!
>> >>>>>> Robert Burke
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> http://go/where-is-rebo
>> >>
>> >>
>> >>
>> >> --
>> >> Cheers,
>> >> Gleb

Re: [Go SDK] User Defined Coders

Posted by Reuven Lax <re...@google.com>.
I agree with this, but I think it's a significant rethinking of Beam that I
didn't want to couple to schemas. In addition to rethinking the API, it
might also require rethinking all of our runners.

Also while columnar can be a large perf win, I suspect that we currently
have lower-hanging fruit to optimize when it comes to performance.

Reuven

On Tue, Jan 8, 2019 at 5:25 AM Robert Bradshaw <ro...@google.com> wrote:

> On Fri, Jan 4, 2019 at 12:54 AM Reuven Lax <re...@google.com> wrote:
> >
> > I looked at Apache Arrow as a potential serialization format for Row
> coders. At the time it didn't seem a perfect fit - Beam's programming model
> is record-at-a-time, and Arrow is optimized for large batches of records
> (while Beam has a concept of "bundles" they are completely non
> deterministic, and records might bundle different on retry). You could use
> Arrow with single-record batches, but I suspect that would end up adding a
> lot of extra overhead. That being said, I think it's still something worth
> investigating further.
>
> Though Beam's model is row-oriented, I think it'd make a lot of sense
> to support column-oriented transfer of data across the data plane
> (we're already concatenating serialized records lengthwise), with
> Arrow as a first candidate, and (either as part of the public API or
> as an implementation detail) columnar processing as well (e.g.
> projections, maps, filters, and aggregations can often be done more
> efficiently in a columnar fashion). While this is often a significant
> win in C++ (and presumably Java), it's essential for doing
> high-performance computing in Python (e.g. Numpy, SciPy, Pandas,
> Tensorflow, ... all have batch-oriented APIs and avoid representing
> records as individual objects, something we'll need to tackle for
> BeamPython at least).
>
> >
> > Reuven
> >
> >
> >
> > On Fri, Jan 4, 2019 at 12:34 AM Gleb Kanterov <gl...@spotify.com> wrote:
> >>
> >> Reuven, it sounds great. I see there is a similar thing to Row coders
> happening in Apache Arrow, and there is a similarity between Apache Arrow
> Flight and data exchange service in portability. How do you see these two
> things relate to each other in the long term?
> >>
> >> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
> >>>
> >>> The biggest advantage is actually readability and usability. A
> secondary advantage is that it means that Go will be able to interact
> seamlessly with BeamSQL, which would be a big win for Go.
> >>>
> >>> A schema is basically a way of saying that a record has a specific set
> of (possibly nested, possibly repeated) fields. So for instance let's say
> that the user's type is a struct with fields named user, country,
> purchaseCost. This allows us to provide transforms that operate on field
> names. Some example (using the Java API):
> >>>
> >>> PCollection users = events.apply(Select.fields("user"));  // Select
> out only the user field.
> >>>
> >>> PCollection joinedEvents =
> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
> PCollections by user.
> >>>
> >>> // For each country, calculate the total purchase cost as well as the
> top 10 purchases.
> >>> // A new schema is created containing fields total_cost and
> top_purchases, and rows are created with the aggregation results.
> >>> PCollection purchaseStatistics = events.apply(
> >>>     Group.byFieldNames("country")
> >>>                .aggregateField("purchaseCost", Sum.ofLongs(),
> "total_cost"))
> >>>                 .aggregateField("purchaseCost", Top.largestLongs(10),
> "top_purchases"))
> >>>
> >>>
> >>> This is far more readable than what we have today, and what unlocks
> this is that Beam actually knows the structure of the record instead of
> assuming records are uncrackable blobs.
> >>>
> >>> Note that a coder is basically a special case of a schema that has a
> single field.
> >>>
> >>> In BeamJava we have a SchemaRegistry which knows how to turn user
> types into schemas. We use reflection to analyze many user types (e.g.
> simple POJO structs, JavaBean classes, Avro records, protocol buffers,
> etc.) to determine the schema, however this is done only when the graph is
> initially generated. We do use code generation (in Java we do bytecode
> generation) to make this somewhat more efficient. I'm willing to bet that
> the code generator you've written for structs could be very easily modified
> for schemas instead, so it would not be wasted work if we went with schemas.
> >>>
> >>> One of the things I'm working on now is documenting Beam schemas. They
> are already very powerful and useful, but since there is still nothing in
> our documentation about them, they are not yet widely used. I expect to
> finish draft documentation by the end of January.
> >>>
> >>> Reuven
> >>>
> >>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
> >>>>
> >>>> That's an interesting idea. I must confess I don't rightly know the
> difference between a schema and coder, but here's what I've got with a bit
> of searching through memory and the mailing list. Please let me know if I'm
> off track.
> >>>>
> >>>> As near as I can tell, a schema, as far as Beam takes it is a
> mechanism to define what data is extracted from a given row of data. So in
> principle, there's an opportunity to be more efficient with data with many
> columns that aren't being used, and only extract the data that's meaningful
> to the pipeline.
> >>>> The trick then is how to apply the schema to a given serialization
> format, which is something I'm missing in my mental model (and then how to
> do it efficiently in Go).
> >>>>
> >>>> I do know that the Go client package for BigQuery does something like
> that, using field tags. Similarly, the "encoding/json" package in the Go
> Standard Library permits annotating fields and it will read out and
> deserialize the JSON fields and that's it.
> >>>>
> >>>> A concern I have is that Go (at present) would require pre-compile
> time code generation for schemas to be efficient, and they would still
> mostly boil down to turning []bytes into real structs. Go reflection
> doesn't keep up.
> >>>> Go has no mechanism I'm aware of to Just In Time compile more
> efficient processing of values.
> >>>> It's also not 100% clear how Schema's would play with protocol
> buffers or similar.
> >>>> BigQuery has a mechanism of generating a JSON schema from a proto
> file, but that's only the specification half, not the using half.
> >>>>
> >>>> As it stands, the code generator I've been building these last months
> could (in principle) statically analyze a user's struct, and then generate
> an efficient dedicated coder for it. It just has no where to put them such
> that the Go SDK would use it.
> >>>>
> >>>>
> >>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
> >>>>>
> >>>>> I'll make a different suggestion. There's been some chatter that
> schemas are a better tool than coders, and that in Beam 3.0 we should make
> schemas the basic semantics instead of coders. Schemas provide everything a
> coder provides, but also allows for far more readable code. We can't make
> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
> we're better off starting with schemas instead of coders?
> >>>>>
> >>>>> Reuven
> >>>>>
> >>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
> wrote:
> >>>>>>
> >>>>>> One area that the Go SDK currently lacks: is the ability for users
> to specify their own coders for types.
> >>>>>>
> >>>>>> I've written a proposal document, and while I'm confident about the
> core, there are certainly some edge cases that require discussion before
> getting on with the implementation.
> >>>>>>
> >>>>>> At presently, the SDK only permits primitive value types (all
> numeric types but complex, strings, and []bytes) which are coded with beam
> coders, and structs whose exported fields are of those type, which is then
> encoded as JSON. Protocol buffer support is hacked in to avoid the type
> anaiyzer, and presents the current work around this issue.
> >>>>>>
> >>>>>> The high level proposal is to catch up with Python and Java, and
> have a coder registry. In addition, arrays, and maps should be permitted as
> well.
> >>>>>>
> >>>>>> If you have alternatives, or other suggestions and opinions, I'd
> love to hear them! Otherwise my intent is to get a PR ready by the end of
> January.
> >>>>>>
> >>>>>> Thanks!
> >>>>>> Robert Burke
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> http://go/where-is-rebo
> >>
> >>
> >>
> >> --
> >> Cheers,
> >> Gleb
>

Re: [Go SDK] User Defined Coders

Posted by Robert Bradshaw <ro...@google.com>.
On Fri, Jan 4, 2019 at 12:54 AM Reuven Lax <re...@google.com> wrote:
>
> I looked at Apache Arrow as a potential serialization format for Row coders. At the time it didn't seem a perfect fit - Beam's programming model is record-at-a-time, and Arrow is optimized for large batches of records (while Beam has a concept of "bundles" they are completely non deterministic, and records might bundle different on retry). You could use Arrow with single-record batches, but I suspect that would end up adding a lot of extra overhead. That being said, I think it's still something worth investigating further.

Though Beam's model is row-oriented, I think it'd make a lot of sense
to support column-oriented transfer of data across the data plane
(we're already concatenating serialized records lengthwise), with
Arrow as a first candidate, and (either as part of the public API or
as an implementation detail) columnar processing as well (e.g.
projections, maps, filters, and aggregations can often be done more
efficiently in a columnar fashion). While this is often a significant
win in C++ (and presumably Java), it's essential for doing
high-performance computing in Python (e.g. Numpy, SciPy, Pandas,
Tensorflow, ... all have batch-oriented APIs and avoid representing
records as individual objects, something we'll need to tackle for
BeamPython at least).

>
> Reuven
>
>
>
> On Fri, Jan 4, 2019 at 12:34 AM Gleb Kanterov <gl...@spotify.com> wrote:
>>
>> Reuven, it sounds great. I see there is a similar thing to Row coders happening in Apache Arrow, and there is a similarity between Apache Arrow Flight and data exchange service in portability. How do you see these two things relate to each other in the long term?
>>
>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>>>
>>> The biggest advantage is actually readability and usability. A secondary advantage is that it means that Go will be able to interact seamlessly with BeamSQL, which would be a big win for Go.
>>>
>>> A schema is basically a way of saying that a record has a specific set of (possibly nested, possibly repeated) fields. So for instance let's say that the user's type is a struct with fields named user, country, purchaseCost. This allows us to provide transforms that operate on field names. Some example (using the Java API):
>>>
>>> PCollection users = events.apply(Select.fields("user"));  // Select out only the user field.
>>>
>>> PCollection joinedEvents = queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two PCollections by user.
>>>
>>> // For each country, calculate the total purchase cost as well as the top 10 purchases.
>>> // A new schema is created containing fields total_cost and top_purchases, and rows are created with the aggregation results.
>>> PCollection purchaseStatistics = events.apply(
>>>     Group.byFieldNames("country")
>>>                .aggregateField("purchaseCost", Sum.ofLongs(), "total_cost"))
>>>                 .aggregateField("purchaseCost", Top.largestLongs(10), "top_purchases"))
>>>
>>>
>>> This is far more readable than what we have today, and what unlocks this is that Beam actually knows the structure of the record instead of assuming records are uncrackable blobs.
>>>
>>> Note that a coder is basically a special case of a schema that has a single field.
>>>
>>> In BeamJava we have a SchemaRegistry which knows how to turn user types into schemas. We use reflection to analyze many user types (e.g. simple POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to determine the schema, however this is done only when the graph is initially generated. We do use code generation (in Java we do bytecode generation) to make this somewhat more efficient. I'm willing to bet that the code generator you've written for structs could be very easily modified for schemas instead, so it would not be wasted work if we went with schemas.
>>>
>>> One of the things I'm working on now is documenting Beam schemas. They are already very powerful and useful, but since there is still nothing in our documentation about them, they are not yet widely used. I expect to finish draft documentation by the end of January.
>>>
>>> Reuven
>>>
>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>>>>
>>>> That's an interesting idea. I must confess I don't rightly know the difference between a schema and coder, but here's what I've got with a bit of searching through memory and the mailing list. Please let me know if I'm off track.
>>>>
>>>> As near as I can tell, a schema, as far as Beam takes it is a mechanism to define what data is extracted from a given row of data. So in principle, there's an opportunity to be more efficient with data with many columns that aren't being used, and only extract the data that's meaningful to the pipeline.
>>>> The trick then is how to apply the schema to a given serialization format, which is something I'm missing in my mental model (and then how to do it efficiently in Go).
>>>>
>>>> I do know that the Go client package for BigQuery does something like that, using field tags. Similarly, the "encoding/json" package in the Go Standard Library permits annotating fields and it will read out and deserialize the JSON fields and that's it.
>>>>
>>>> A concern I have is that Go (at present) would require pre-compile time code generation for schemas to be efficient, and they would still mostly boil down to turning []bytes into real structs. Go reflection doesn't keep up.
>>>> Go has no mechanism I'm aware of to Just In Time compile more efficient processing of values.
>>>> It's also not 100% clear how Schema's would play with protocol buffers or similar.
>>>> BigQuery has a mechanism of generating a JSON schema from a proto file, but that's only the specification half, not the using half.
>>>>
>>>> As it stands, the code generator I've been building these last months could (in principle) statically analyze a user's struct, and then generate an efficient dedicated coder for it. It just has no where to put them such that the Go SDK would use it.
>>>>
>>>>
>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>> I'll make a different suggestion. There's been some chatter that schemas are a better tool than coders, and that in Beam 3.0 we should make schemas the basic semantics instead of coders. Schemas provide everything a coder provides, but also allows for far more readable code. We can't make such a change in Beam Java 2.X for compatibility reasons, but maybe in Go we're better off starting with schemas instead of coders?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com> wrote:
>>>>>>
>>>>>> One area that the Go SDK currently lacks: is the ability for users to specify their own coders for types.
>>>>>>
>>>>>> I've written a proposal document, and while I'm confident about the core, there are certainly some edge cases that require discussion before getting on with the implementation.
>>>>>>
>>>>>> At presently, the SDK only permits primitive value types (all numeric types but complex, strings, and []bytes) which are coded with beam coders, and structs whose exported fields are of those type, which is then encoded as JSON. Protocol buffer support is hacked in to avoid the type anaiyzer, and presents the current work around this issue.
>>>>>>
>>>>>> The high level proposal is to catch up with Python and Java, and have a coder registry. In addition, arrays, and maps should be permitted as well.
>>>>>>
>>>>>> If you have alternatives, or other suggestions and opinions, I'd love to hear them! Otherwise my intent is to get a PR ready by the end of January.
>>>>>>
>>>>>> Thanks!
>>>>>> Robert Burke
>>>>
>>>>
>>>>
>>>> --
>>>> http://go/where-is-rebo
>>
>>
>>
>> --
>> Cheers,
>> Gleb

Re: [Go SDK] User Defined Coders

Posted by Reuven Lax <re...@google.com>.
I looked at Apache Arrow as a potential serialization format for Row
coders. At the time it didn't seem a perfect fit - Beam's programming model
is record-at-a-time, and Arrow is optimized for large batches of records
(while Beam has a concept of "bundles" they are completely non
deterministic, and records might bundle different on retry). You could use
Arrow with single-record batches, but I suspect that would end up adding a
lot of extra overhead. That being said, I think it's still something worth
investigating further.

Reuven



On Fri, Jan 4, 2019 at 12:34 AM Gleb Kanterov <gl...@spotify.com> wrote:

> Reuven, it sounds great. I see there is a similar thing to Row coders
> happening in Apache Arrow <https://arrow.apache.org>, and there is a
> similarity between Apache Arrow Flight
> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
> and data exchange service in portability. How do you see these two things
> relate to each other in the long term?
>
> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>
>> The biggest advantage is actually readability and usability. A secondary
>> advantage is that it means that Go will be able to interact seamlessly with
>> BeamSQL, which would be a big win for Go.
>>
>> A schema is basically a way of saying that a record has a specific set of
>> (possibly nested, possibly repeated) fields. So for instance let's say that
>> the user's type is a struct with fields named user, country, purchaseCost.
>> This allows us to provide transforms that operate on field names. Some
>> example (using the Java API):
>>
>> PCollection users = events.apply(Select.fields("user"));  // Select out
>> only the user field.
>>
>> PCollection joinedEvents =
>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>> PCollections by user.
>>
>> // For each country, calculate the total purchase cost as well as the top
>> 10 purchases.
>> // A new schema is created containing fields total_cost and
>> top_purchases, and rows are created with the aggregation results.
>> PCollection purchaseStatistics = events.apply(
>>     Group.byFieldNames("country")
>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>> "total_cost"))
>>                 .aggregateField("purchaseCost", Top.largestLongs(10),
>> "top_purchases"))
>>
>>
>> This is far more readable than what we have today, and what unlocks this
>> is that Beam actually knows the structure of the record instead of assuming
>> records are uncrackable blobs.
>>
>> Note that a coder is basically a special case of a schema that has a
>> single field.
>>
>> In BeamJava we have a SchemaRegistry which knows how to turn user types
>> into schemas. We use reflection to analyze many user types (e.g. simple
>> POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to
>> determine the schema, however this is done only when the graph is initially
>> generated. We do use code generation (in Java we do bytecode generation) to
>> make this somewhat more efficient. I'm willing to bet that the code
>> generator you've written for structs could be very easily modified for
>> schemas instead, so it would not be wasted work if we went with schemas.
>>
>> One of the things I'm working on now is documenting Beam schemas. They
>> are already very powerful and useful, but since there is still nothing in
>> our documentation about them, they are not yet widely used. I expect to
>> finish draft documentation by the end of January.
>>
>> Reuven
>>
>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>>
>>> That's an interesting idea. I must confess I don't rightly know the
>>> difference between a schema and coder, but here's what I've got with a bit
>>> of searching through memory and the mailing list. Please let me know if I'm
>>> off track.
>>>
>>> As near as I can tell, a schema, as far as Beam takes it
>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
>>> a mechanism to define what data is extracted from a given row of data. So
>>> in principle, there's an opportunity to be more efficient with data with
>>> many columns that aren't being used, and only extract the data that's
>>> meaningful to the pipeline.
>>> The trick then is how to apply the schema to a given serialization
>>> format, which is something I'm missing in my mental model (and then how to
>>> do it efficiently in Go).
>>>
>>> I do know that the Go client package for BigQuery
>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>>> something like that, using field tags. Similarly, the "encoding/json"
>>> <https://golang.org/doc/articles/json_and_go.html> package in the Go
>>> Standard Library permits annotating fields and it will read out and
>>> deserialize the JSON fields and that's it.
>>>
>>> A concern I have is that Go (at present) would require pre-compile time
>>> code generation for schemas to be efficient, and they would still mostly
>>> boil down to turning []bytes into real structs. Go reflection doesn't keep
>>> up.
>>> Go has no mechanism I'm aware of to Just In Time compile more efficient
>>> processing of values.
>>> It's also not 100% clear how Schema's would play with protocol buffers
>>> or similar.
>>> BigQuery has a mechanism of generating a JSON schema from a proto file
>>> <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, but
>>> that's only the specification half, not the using half.
>>>
>>> As it stands, the code generator I've been building these last months
>>> could (in principle) statically analyze a user's struct, and then generate
>>> an efficient dedicated coder for it. It just has no where to put them such
>>> that the Go SDK would use it.
>>>
>>>
>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I'll make a different suggestion. There's been some chatter that
>>>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>>>> schemas the basic semantics instead of coders. Schemas provide everything a
>>>> coder provides, but also allows for far more readable code. We can't make
>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>>>> we're better off starting with schemas instead of coders?
>>>>
>>>> Reuven
>>>>
>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com> wrote:
>>>>
>>>>> One area that the Go SDK currently lacks: is the ability for users to
>>>>> specify their own coders for types.
>>>>>
>>>>> I've written a proposal document,
>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>> that require discussion before getting on with the implementation.
>>>>>
>>>>> At presently, the SDK only permits primitive value types (all numeric
>>>>> types but complex, strings, and []bytes) which are coded with beam coders,
>>>>> and structs whose exported fields are of those type, which is then encoded
>>>>> as JSON. Protocol buffer support is hacked in to avoid the type anaiyzer,
>>>>> and presents the current work around this issue.
>>>>>
>>>>> The high level proposal is to catch up with Python and Java, and have
>>>>> a coder registry. In addition, arrays, and maps should be permitted as well.
>>>>>
>>>>> If you have alternatives, or other suggestions and opinions, I'd love
>>>>> to hear them! Otherwise my intent is to get a PR ready by the end of
>>>>> January.
>>>>>
>>>>> Thanks!
>>>>> Robert Burke
>>>>>
>>>>
>>>
>>> --
>>> http://go/where-is-rebo
>>>
>>
>
> --
> Cheers,
> Gleb
>

Re: Schemas in the Go SDK

Posted by Robert Burke <ro...@frantil.com>.
Having slept on it here are my thoughts, but granted, AFAICT there is no
spec for schema's so my understanding is based on what I've learned in the
last 18-ish hours. If there is a spec, I'd love to see it.

*1.* Default behavior to support Schema's in some way doesn't remove the
need for certain specific uses of an atomic coder for a type. eg.
Specifying that Beam shouldn't look further into this type.

TBH the interaction between schema's and coders is the least interesting
part about schemas and matters in precious few circumstances. In
particular, when Grouping By Key, it seems like the schema coder should be
used by default but otherwise, not. Further, there's always the option to
"try the schema" encoding and should that fail, try any existing atomic
coder by default, though this risks data corruption in some situations.

*1.a *In a later beam version, it could be true that there's no need for
such uses. There's always the option to work around anything by writing at
DoFn that accepts a []byte, and then produces a given type. However
decoding []byte and encoding back again seems like a common enough
operation for some domains that having direct beam support in some capacity
is desirable for performance reasons.

*2.* It would be easy enough to have a pipeline fail at construction time
should a type not be able to derive a schema for itself, and it's put into
a schema required scenario.

*3.* The Go SDK does recursive type analysis to be able encode types
<https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/serialize.go#L346>
for coders anyway, as Go has no native concept of "serializable types" or
"serializable functions" It wouldn't be too much of a stretch to convert
this representation to a Portable Schema representation.

When materializing types, Go has extensively defined Type Conversion rules
<https://golang.org/ref/spec#Conversions> which are accessible via the
reflect package. This means that we can always synthetically create an
instance of a real type from something like a schema, assuming they match
field for field. Eg. If a user declares a PCollection with a given Schema,
then in principle it would be possible to use that PCollection as an input
with a field for field compatible real struct type, and have this verified
at construction time. The "extra sauce" would be to have this happen for a
subset of fields for convenient extraction, ala the annotation use in java.

In particular, this means that whenever the Go SDK is in a scenario that it
doesn't have a schema*, it could probably create one ad-hoc *for that
context, and use the atomic coder the rest of the time if available.
Whether we want it do so is another matter, and probably situation specific.

*4. *It seems Long Term (in that it will be eventually be done, not that it
will necessarily take a long time to get there), that Schemas are likely
the interchange format for Cross Language pipeline support. That is, when
an SDK is invoking a transform in a different language (say, Beam Go
calling on Beam SQL), the values could be specified, and returned in the
schema format, to ensure compatibility. The trick here is that the expected
return schema still needs to be explicitly specified from the user in some
circumstances. (eg. Going from a SQL statement -> Schema doesn't seem like
a natural fit, and won't necessarily be available at pipeline construction
time in the remote language.)

*5.* An interesting aspect of schemas is that they fundamentally enable
SDKs to start with a light DSL layer with "known" types and
transforms/combines/joins, which then never need to be invoked on the SDK
layer. Runners could each implement schemas directly and avoid unnecessary
FnAPI hops for improved performance, largely because they know the type's
structure. No need for any of it to be implemented SDK side to start.

 Overall this is a noble goal in that it enables more languages more
easily, but it's concerning from my view, in that the other goal is to
enable data processing in the SDK language, and this moves it farther away
from the more general, if verbose approaches to do the same thing.

I'm on the side of Scalable Data Processing in Go, which ideally entails
writing Go, rather than an abstract DSL.


I don't speak for all Go users, and welcome hearing from others.

On Thu, 3 Jan 2019 at 17:52 Robert Burke <ro...@frantil.com> wrote:

> At this point I feel like the schema discussion should be a separate
> thread from having a Coder Registry in Go, which was the original topic, so
> I'm forking it.
>
> It does sounds like adding Schemas to the Go SDK would be a much larger
> extension than the registry.
>
> I'm not convinced that not having a convenient registry would serve Go SDK
> users (such as they exist).
>
> The concern I have isn't so much for Ints or Doubles, but for user types
> such as Protocol Buffers, but not just those. There will be some users who
> prize efficiency first, and readability second. The Go SDK presently uses
> JSON encoding by default which has many of the properties of schemas, but
> is severely limiting for power users.
>
>
> It sounds like the following are true:
> 1. Full use of the Schemas in the Go SDK will require FnAPI support.
> * Until the FnAPI supports it, and the semantics are implemented in the
> ULR, the Go SDK probably shouldn't begin to implement against it.
> * This is identical to Go's lack of SplitableDoFn keeping Beam Go
> pipelines from scaling or from having Cross Language IO, which is also a
> precursor to BeamGo using Beam SQL.
> 2. The main collision between Schemas and Coders are in the event that a
> given type has both defined for it: Which is being used and when?
> * This seems to me more to do with being able to enable use of the
> syntactic sugar or not, but we know that at construction time, by the very
> use of the sugar.
> * If a file wants to materialize a file encoded with the Schema, one would
> need to encode that in the DoFn doing the writing somehow (eg. ForceSchema
> or ForceCoder, whichever we want to make the default). This has pipeline
> compatibility implications.
>
> It's not presently possible for Go to annotate function parameters, but
> something could be worked out, similarly to how SideInputs are configured
> in the Go SDK. I'd be concerned about the efficiency of those operations
> though, even with Generics or code generation.
>
>
> On Thu, 3 Jan 2019 at 16:33 Reuven Lax <re...@google.com> wrote:
>
>> On Fri, Jan 4, 2019 at 1:19 AM Robert Burke <ro...@frantil.com> wrote:
>>
>>> Very interesting Reuven!
>>>
>>> That would be a huge readability improvement, but it would also be a
>>> significant investment over my time budget to implement them on the Go side
>>> correctly. I would certainly want to read your documentation before going
>>> ahead.  Will the Portability FnAPI have dedicated Schema support? That
>>> would certainly change things.
>>>
>>
>> Yes, there's absolutely a plan to add schema definitions to the FnAPI.
>> This is what will allow you to use SQL from BeamGo
>>
>>>
>>> It's not clear to me how one might achieve the inversion from
>>> SchemaCoder being a special casing of CustomCoder to the other way around,
>>> since a field has a type, and that type needs to be encoded. Short of
>>> always encoding the primitive values in the way Beam prefers, it doesn't
>>> seem to allow for customizing the encoding on output, or really say
>>> anything outside of the (admittedly excellent) syntactic sugar demonstrated
>>> with the Java API.
>>>
>>
>> I'm not quite sure I understand. But schemas define a fixed set of
>> primitive types, and also define the encodings for those primitive types.
>> If a user wants custom encoding for a primitive type, they can create a
>> byte-array field and wrap that field with a Coder (this is why I said that
>> todays Coders are simply special cases); this should be very rare though,
>> as users rarely should care how Beam encodes a long or a double.
>>
>>>
>>> Offhand, Schemas seem to be an alternative to pipeline construction,
>>> rather than coders for value serialization, allowing manual field
>>> extraction code to be omitted. They do not appear to be a fundamental
>>> approach to achieve it. For example, the grouping operation still needs to
>>> encode the whole of the object as a value.
>>>
>>
>> Schemas are properties of the data - essentially a Schema is the data
>> type of a PCollection. In Java Schemas are also understood by ParDo, so you
>> can write a ParDo like this:
>>
>> @ProcessElement
>> public void process(@Field("user") String userId,  @Field("country")
>> String countryCode) {
>> }
>>
>> These extra functionalities are part of the graph, but they are enabled
>> by schemas.
>>
>>>
>>> As mentioned, I'm hoping to have a solution for existing coders by
>>> January's end, so waiting for your documentation doesn't work on that
>>> timeline.
>>>
>>
>> I don't think we need to wait for all the documentation to be written.
>>
>>
>>>
>>> That said, they aren't incompatible ideas as demonstrated by the Java
>>> implementation. The Go SDK remains in an experimental state. We can change
>>> things should the need arise in the next few months. Further, whenever Generics
>>> in Go
>>> <https://go.googlesource.com/proposal/+/master/design/go2draft-generics-overview.md>
>>> crop up, the existing user surface and execution stack will need to be
>>> re-written to take advantage of them anyway. That provides an opportunity
>>> to invert Coder vs Schema dependence while getting a nice performance
>>> boost, and cleaner code (and deleting much of my code generator).
>>>
>>> ----
>>>
>>> Were I to implement schemas to get the same syntatic benefits as the
>>> Java API, I'd be leveraging the field annotations Go has. This satisfies
>>> the protocol buffer issue as well, since generated go protos have name &
>>> json annotations. Schemas could be extracted that way. These are also
>>> available to anything using static analysis for more direct generation of
>>> accessors. The reflective approach would also work, which is excellent for
>>> development purposes.
>>>
>>> The rote code that the schemas were replacing would be able to be
>>> cobbled together into efficient DoFn and CombineFns for serialization. At
>>> present, it seems like it could be implemented as a side package that uses
>>> beam, rather than changing portions of the core beam Go packages, The real
>>> trick would be to do so without "apply" since that's not how the Go SDK is
>>> shaped.
>>>
>>>
>>>
>>>
>>> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <gl...@spotify.com> wrote:
>>>
>>>> Reuven, it sounds great. I see there is a similar thing to Row coders
>>>> happening in Apache Arrow <https://arrow.apache.org>, and there is a
>>>> similarity between Apache Arrow Flight
>>>> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
>>>> and data exchange service in portability. How do you see these two things
>>>> relate to each other in the long term?
>>>>
>>>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> The biggest advantage is actually readability and usability. A
>>>>> secondary advantage is that it means that Go will be able to interact
>>>>> seamlessly with BeamSQL, which would be a big win for Go.
>>>>>
>>>>> A schema is basically a way of saying that a record has a specific set
>>>>> of (possibly nested, possibly repeated) fields. So for instance let's say
>>>>> that the user's type is a struct with fields named user, country,
>>>>> purchaseCost. This allows us to provide transforms that operate on field
>>>>> names. Some example (using the Java API):
>>>>>
>>>>> PCollection users = events.apply(Select.fields("user"));  // Select
>>>>> out only the user field.
>>>>>
>>>>> PCollection joinedEvents =
>>>>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>>>>> PCollections by user.
>>>>>
>>>>> // For each country, calculate the total purchase cost as well as the
>>>>> top 10 purchases.
>>>>> // A new schema is created containing fields total_cost and
>>>>> top_purchases, and rows are created with the aggregation results.
>>>>> PCollection purchaseStatistics = events.apply(
>>>>>     Group.byFieldNames("country")
>>>>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>>>>> "total_cost"))
>>>>>                 .aggregateField("purchaseCost", Top.largestLongs(10),
>>>>> "top_purchases"))
>>>>>
>>>>>
>>>>> This is far more readable than what we have today, and what unlocks
>>>>> this is that Beam actually knows the structure of the record instead of
>>>>> assuming records are uncrackable blobs.
>>>>>
>>>>> Note that a coder is basically a special case of a schema that has a
>>>>> single field.
>>>>>
>>>>> In BeamJava we have a SchemaRegistry which knows how to turn user
>>>>> types into schemas. We use reflection to analyze many user types (e.g.
>>>>> simple POJO structs, JavaBean classes, Avro records, protocol buffers,
>>>>> etc.) to determine the schema, however this is done only when the graph is
>>>>> initially generated. We do use code generation (in Java we do bytecode
>>>>> generation) to make this somewhat more efficient. I'm willing to bet that
>>>>> the code generator you've written for structs could be very easily modified
>>>>> for schemas instead, so it would not be wasted work if we went with schemas.
>>>>>
>>>>> One of the things I'm working on now is documenting Beam schemas. They
>>>>> are already very powerful and useful, but since there is still nothing in
>>>>> our documentation about them, they are not yet widely used. I expect to
>>>>> finish draft documentation by the end of January.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>>>>>
>>>>>> That's an interesting idea. I must confess I don't rightly know the
>>>>>> difference between a schema and coder, but here's what I've got with a bit
>>>>>> of searching through memory and the mailing list. Please let me know if I'm
>>>>>> off track.
>>>>>>
>>>>>> As near as I can tell, a schema, as far as Beam takes it
>>>>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
>>>>>> a mechanism to define what data is extracted from a given row of data. So
>>>>>> in principle, there's an opportunity to be more efficient with data with
>>>>>> many columns that aren't being used, and only extract the data that's
>>>>>> meaningful to the pipeline.
>>>>>> The trick then is how to apply the schema to a given serialization
>>>>>> format, which is something I'm missing in my mental model (and then how to
>>>>>> do it efficiently in Go).
>>>>>>
>>>>>> I do know that the Go client package for BigQuery
>>>>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>>>>>> something like that, using field tags. Similarly, the "encoding/json"
>>>>>> <https://golang.org/doc/articles/json_and_go.html> package in the Go
>>>>>> Standard Library permits annotating fields and it will read out and
>>>>>> deserialize the JSON fields and that's it.
>>>>>>
>>>>>> A concern I have is that Go (at present) would require pre-compile
>>>>>> time code generation for schemas to be efficient, and they would still
>>>>>> mostly boil down to turning []bytes into real structs. Go reflection
>>>>>> doesn't keep up.
>>>>>> Go has no mechanism I'm aware of to Just In Time compile more
>>>>>> efficient processing of values.
>>>>>> It's also not 100% clear how Schema's would play with protocol
>>>>>> buffers or similar.
>>>>>> BigQuery has a mechanism of generating a JSON schema from a proto file
>>>>>> <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, but
>>>>>> that's only the specification half, not the using half.
>>>>>>
>>>>>> As it stands, the code generator I've been building these last months
>>>>>> could (in principle) statically analyze a user's struct, and then generate
>>>>>> an efficient dedicated coder for it. It just has no where to put them such
>>>>>> that the Go SDK would use it.
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> I'll make a different suggestion. There's been some chatter that
>>>>>>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>>>>>>> schemas the basic semantics instead of coders. Schemas provide everything a
>>>>>>> coder provides, but also allows for far more readable code. We can't make
>>>>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>>>>>>> we're better off starting with schemas instead of coders?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> One area that the Go SDK currently lacks: is the ability for users
>>>>>>>> to specify their own coders for types.
>>>>>>>>
>>>>>>>> I've written a proposal document,
>>>>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>>>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>>>>> that require discussion before getting on with the implementation.
>>>>>>>>
>>>>>>>> At presently, the SDK only permits primitive value types (all
>>>>>>>> numeric types but complex, strings, and []bytes) which are coded with beam
>>>>>>>> coders, and structs whose exported fields are of those type, which is then
>>>>>>>> encoded as JSON. Protocol buffer support is hacked in to avoid the type
>>>>>>>> anaiyzer, and presents the current work around this issue.
>>>>>>>>
>>>>>>>> The high level proposal is to catch up with Python and Java, and
>>>>>>>> have a coder registry. In addition, arrays, and maps should be permitted as
>>>>>>>> well.
>>>>>>>>
>>>>>>>> If you have alternatives, or other suggestions and opinions, I'd
>>>>>>>> love to hear them! Otherwise my intent is to get a PR ready by the end of
>>>>>>>> January.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> Robert Burke
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> http://go/where-is-rebo
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> Gleb
>>>>
>>>

Schemas in the Go SDK

Posted by Robert Burke <ro...@frantil.com>.
At this point I feel like the schema discussion should be a separate thread
from having a Coder Registry in Go, which was the original topic, so I'm
forking it.

It does sounds like adding Schemas to the Go SDK would be a much larger
extension than the registry.

I'm not convinced that not having a convenient registry would serve Go SDK
users (such as they exist).

The concern I have isn't so much for Ints or Doubles, but for user types
such as Protocol Buffers, but not just those. There will be some users who
prize efficiency first, and readability second. The Go SDK presently uses
JSON encoding by default which has many of the properties of schemas, but
is severely limiting for power users.


It sounds like the following are true:
1. Full use of the Schemas in the Go SDK will require FnAPI support.
* Until the FnAPI supports it, and the semantics are implemented in the
ULR, the Go SDK probably shouldn't begin to implement against it.
* This is identical to Go's lack of SplitableDoFn keeping Beam Go pipelines
from scaling or from having Cross Language IO, which is also a precursor to
BeamGo using Beam SQL.
2. The main collision between Schemas and Coders are in the event that a
given type has both defined for it: Which is being used and when?
* This seems to me more to do with being able to enable use of the
syntactic sugar or not, but we know that at construction time, by the very
use of the sugar.
* If a file wants to materialize a file encoded with the Schema, one would
need to encode that in the DoFn doing the writing somehow (eg. ForceSchema
or ForceCoder, whichever we want to make the default). This has pipeline
compatibility implications.

It's not presently possible for Go to annotate function parameters, but
something could be worked out, similarly to how SideInputs are configured
in the Go SDK. I'd be concerned about the efficiency of those operations
though, even with Generics or code generation.


On Thu, 3 Jan 2019 at 16:33 Reuven Lax <re...@google.com> wrote:

> On Fri, Jan 4, 2019 at 1:19 AM Robert Burke <ro...@frantil.com> wrote:
>
>> Very interesting Reuven!
>>
>> That would be a huge readability improvement, but it would also be a
>> significant investment over my time budget to implement them on the Go side
>> correctly. I would certainly want to read your documentation before going
>> ahead.  Will the Portability FnAPI have dedicated Schema support? That
>> would certainly change things.
>>
>
> Yes, there's absolutely a plan to add schema definitions to the FnAPI.
> This is what will allow you to use SQL from BeamGo
>
>>
>> It's not clear to me how one might achieve the inversion from SchemaCoder
>> being a special casing of CustomCoder to the other way around, since a
>> field has a type, and that type needs to be encoded. Short of always
>> encoding the primitive values in the way Beam prefers, it doesn't seem to
>> allow for customizing the encoding on output, or really say anything
>> outside of the (admittedly excellent) syntactic sugar demonstrated with the
>> Java API.
>>
>
> I'm not quite sure I understand. But schemas define a fixed set of
> primitive types, and also define the encodings for those primitive types.
> If a user wants custom encoding for a primitive type, they can create a
> byte-array field and wrap that field with a Coder (this is why I said that
> todays Coders are simply special cases); this should be very rare though,
> as users rarely should care how Beam encodes a long or a double.
>
>>
>> Offhand, Schemas seem to be an alternative to pipeline construction,
>> rather than coders for value serialization, allowing manual field
>> extraction code to be omitted. They do not appear to be a fundamental
>> approach to achieve it. For example, the grouping operation still needs to
>> encode the whole of the object as a value.
>>
>
> Schemas are properties of the data - essentially a Schema is the data type
> of a PCollection. In Java Schemas are also understood by ParDo, so you can
> write a ParDo like this:
>
> @ProcessElement
> public void process(@Field("user") String userId,  @Field("country")
> String countryCode) {
> }
>
> These extra functionalities are part of the graph, but they are enabled by
> schemas.
>
>>
>> As mentioned, I'm hoping to have a solution for existing coders by
>> January's end, so waiting for your documentation doesn't work on that
>> timeline.
>>
>
> I don't think we need to wait for all the documentation to be written.
>
>
>>
>> That said, they aren't incompatible ideas as demonstrated by the Java
>> implementation. The Go SDK remains in an experimental state. We can change
>> things should the need arise in the next few months. Further, whenever Generics
>> in Go
>> <https://go.googlesource.com/proposal/+/master/design/go2draft-generics-overview.md>
>> crop up, the existing user surface and execution stack will need to be
>> re-written to take advantage of them anyway. That provides an opportunity
>> to invert Coder vs Schema dependence while getting a nice performance
>> boost, and cleaner code (and deleting much of my code generator).
>>
>> ----
>>
>> Were I to implement schemas to get the same syntatic benefits as the Java
>> API, I'd be leveraging the field annotations Go has. This satisfies the
>> protocol buffer issue as well, since generated go protos have name & json
>> annotations. Schemas could be extracted that way. These are also available
>> to anything using static analysis for more direct generation of accessors.
>> The reflective approach would also work, which is excellent for development
>> purposes.
>>
>> The rote code that the schemas were replacing would be able to be cobbled
>> together into efficient DoFn and CombineFns for serialization. At present,
>> it seems like it could be implemented as a side package that uses beam,
>> rather than changing portions of the core beam Go packages, The real trick
>> would be to do so without "apply" since that's not how the Go SDK is shaped.
>>
>>
>>
>>
>> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <gl...@spotify.com> wrote:
>>
>>> Reuven, it sounds great. I see there is a similar thing to Row coders
>>> happening in Apache Arrow <https://arrow.apache.org>, and there is a
>>> similarity between Apache Arrow Flight
>>> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
>>> and data exchange service in portability. How do you see these two things
>>> relate to each other in the long term?
>>>
>>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> The biggest advantage is actually readability and usability. A
>>>> secondary advantage is that it means that Go will be able to interact
>>>> seamlessly with BeamSQL, which would be a big win for Go.
>>>>
>>>> A schema is basically a way of saying that a record has a specific set
>>>> of (possibly nested, possibly repeated) fields. So for instance let's say
>>>> that the user's type is a struct with fields named user, country,
>>>> purchaseCost. This allows us to provide transforms that operate on field
>>>> names. Some example (using the Java API):
>>>>
>>>> PCollection users = events.apply(Select.fields("user"));  // Select out
>>>> only the user field.
>>>>
>>>> PCollection joinedEvents =
>>>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>>>> PCollections by user.
>>>>
>>>> // For each country, calculate the total purchase cost as well as the
>>>> top 10 purchases.
>>>> // A new schema is created containing fields total_cost and
>>>> top_purchases, and rows are created with the aggregation results.
>>>> PCollection purchaseStatistics = events.apply(
>>>>     Group.byFieldNames("country")
>>>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>>>> "total_cost"))
>>>>                 .aggregateField("purchaseCost", Top.largestLongs(10),
>>>> "top_purchases"))
>>>>
>>>>
>>>> This is far more readable than what we have today, and what unlocks
>>>> this is that Beam actually knows the structure of the record instead of
>>>> assuming records are uncrackable blobs.
>>>>
>>>> Note that a coder is basically a special case of a schema that has a
>>>> single field.
>>>>
>>>> In BeamJava we have a SchemaRegistry which knows how to turn user types
>>>> into schemas. We use reflection to analyze many user types (e.g. simple
>>>> POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to
>>>> determine the schema, however this is done only when the graph is initially
>>>> generated. We do use code generation (in Java we do bytecode generation) to
>>>> make this somewhat more efficient. I'm willing to bet that the code
>>>> generator you've written for structs could be very easily modified for
>>>> schemas instead, so it would not be wasted work if we went with schemas.
>>>>
>>>> One of the things I'm working on now is documenting Beam schemas. They
>>>> are already very powerful and useful, but since there is still nothing in
>>>> our documentation about them, they are not yet widely used. I expect to
>>>> finish draft documentation by the end of January.
>>>>
>>>> Reuven
>>>>
>>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>>>>
>>>>> That's an interesting idea. I must confess I don't rightly know the
>>>>> difference between a schema and coder, but here's what I've got with a bit
>>>>> of searching through memory and the mailing list. Please let me know if I'm
>>>>> off track.
>>>>>
>>>>> As near as I can tell, a schema, as far as Beam takes it
>>>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
>>>>> a mechanism to define what data is extracted from a given row of data. So
>>>>> in principle, there's an opportunity to be more efficient with data with
>>>>> many columns that aren't being used, and only extract the data that's
>>>>> meaningful to the pipeline.
>>>>> The trick then is how to apply the schema to a given serialization
>>>>> format, which is something I'm missing in my mental model (and then how to
>>>>> do it efficiently in Go).
>>>>>
>>>>> I do know that the Go client package for BigQuery
>>>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>>>>> something like that, using field tags. Similarly, the "encoding/json"
>>>>> <https://golang.org/doc/articles/json_and_go.html> package in the Go
>>>>> Standard Library permits annotating fields and it will read out and
>>>>> deserialize the JSON fields and that's it.
>>>>>
>>>>> A concern I have is that Go (at present) would require pre-compile
>>>>> time code generation for schemas to be efficient, and they would still
>>>>> mostly boil down to turning []bytes into real structs. Go reflection
>>>>> doesn't keep up.
>>>>> Go has no mechanism I'm aware of to Just In Time compile more
>>>>> efficient processing of values.
>>>>> It's also not 100% clear how Schema's would play with protocol buffers
>>>>> or similar.
>>>>> BigQuery has a mechanism of generating a JSON schema from a proto file
>>>>> <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, but
>>>>> that's only the specification half, not the using half.
>>>>>
>>>>> As it stands, the code generator I've been building these last months
>>>>> could (in principle) statically analyze a user's struct, and then generate
>>>>> an efficient dedicated coder for it. It just has no where to put them such
>>>>> that the Go SDK would use it.
>>>>>
>>>>>
>>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> I'll make a different suggestion. There's been some chatter that
>>>>>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>>>>>> schemas the basic semantics instead of coders. Schemas provide everything a
>>>>>> coder provides, but also allows for far more readable code. We can't make
>>>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>>>>>> we're better off starting with schemas instead of coders?
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
>>>>>> wrote:
>>>>>>
>>>>>>> One area that the Go SDK currently lacks: is the ability for users
>>>>>>> to specify their own coders for types.
>>>>>>>
>>>>>>> I've written a proposal document,
>>>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>>>> that require discussion before getting on with the implementation.
>>>>>>>
>>>>>>> At presently, the SDK only permits primitive value types (all
>>>>>>> numeric types but complex, strings, and []bytes) which are coded with beam
>>>>>>> coders, and structs whose exported fields are of those type, which is then
>>>>>>> encoded as JSON. Protocol buffer support is hacked in to avoid the type
>>>>>>> anaiyzer, and presents the current work around this issue.
>>>>>>>
>>>>>>> The high level proposal is to catch up with Python and Java, and
>>>>>>> have a coder registry. In addition, arrays, and maps should be permitted as
>>>>>>> well.
>>>>>>>
>>>>>>> If you have alternatives, or other suggestions and opinions, I'd
>>>>>>> love to hear them! Otherwise my intent is to get a PR ready by the end of
>>>>>>> January.
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Robert Burke
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> http://go/where-is-rebo
>>>>>
>>>>
>>>
>>> --
>>> Cheers,
>>> Gleb
>>>
>>

Re: [Go SDK] User Defined Coders

Posted by Reuven Lax <re...@google.com>.
Maybe a good first step would be to write a doc explaining how this would
work in the Go SDK and share with the dev list. It's possible we will
decide to just implement Coders first, however that way this will be done
with everyone fully understanding the design tradeoffs.

Reuven

On Fri, Jan 4, 2019 at 7:05 PM Kenneth Knowles <ke...@apache.org> wrote:

> On Thu, Jan 3, 2019 at 4:33 PM Reuven Lax <re...@google.com> wrote:
>
>> If a user wants custom encoding for a primitive type, they can create a
>> byte-array field and wrap that field with a Coder
>>
>
> This is the crux of the issue, right?
>
> Roughly, today, we've got:
>
>         Schema ::= [ (fieldname, Type) ]
>
>         Type ::= AtomicType | Array<Type> | Map<Type, Type> |
> Struct<Schema>
>
>         AtomicType ::= bytes | int{16, 32, 64} | datetime | string | ...
>
> To fully replace custom encodings as they exist, you need:
>
>         AtomicType ::= bytes<CustomCoder> | ...
>
> At this point, an SDK need not surface the concept of "Coder" to a user at
> all outside the bytes field concept and the wire encoding and efficient
> should be identical or nearly to what we do with coders today. PCollections
> in such an SDK have schemas, not coders, so we have successfully turned it
> completely inside-out relative to how the Java SDK does it. Is that what
> you have in mind?
>
> I really like this, but I agree with Robert that this is a major change
> that takes a bunch of work and a lot more collaborative thinking in design
> docs if we hope to get it right/stable.
>
> Kenn
>
>
>> (this is why I said that todays Coders are simply special cases); this
>> should be very rare though, as users rarely should care how Beam encodes a
>> long or a double.
>>
>>>
>>> Offhand, Schemas seem to be an alternative to pipeline construction,
>>> rather than coders for value serialization, allowing manual field
>>> extraction code to be omitted. They do not appear to be a fundamental
>>> approach to achieve it. For example, the grouping operation still needs to
>>> encode the whole of the object as a value.
>>>
>>
>> Schemas are properties of the data - essentially a Schema is the data
>> type of a PCollection. In Java Schemas are also understood by ParDo, so you
>> can write a ParDo like this:
>>
>> @ProcessElement
>> public void process(@Field("user") String userId,  @Field("country")
>> String countryCode) {
>> }
>>
>> These extra functionalities are part of the graph, but they are enabled
>> by schemas.
>>
>>>
>>> As mentioned, I'm hoping to have a solution for existing coders by
>>> January's end, so waiting for your documentation doesn't work on that
>>> timeline.
>>>
>>
>> I don't think we need to wait for all the documentation to be written.
>>
>>
>>>
>>> That said, they aren't incompatible ideas as demonstrated by the Java
>>> implementation. The Go SDK remains in an experimental state. We can change
>>> things should the need arise in the next few months. Further, whenever Generics
>>> in Go
>>> <https://go.googlesource.com/proposal/+/master/design/go2draft-generics-overview.md>
>>> crop up, the existing user surface and execution stack will need to be
>>> re-written to take advantage of them anyway. That provides an opportunity
>>> to invert Coder vs Schema dependence while getting a nice performance
>>> boost, and cleaner code (and deleting much of my code generator).
>>>
>>> ----
>>>
>>> Were I to implement schemas to get the same syntatic benefits as the
>>> Java API, I'd be leveraging the field annotations Go has. This satisfies
>>> the protocol buffer issue as well, since generated go protos have name &
>>> json annotations. Schemas could be extracted that way. These are also
>>> available to anything using static analysis for more direct generation of
>>> accessors. The reflective approach would also work, which is excellent for
>>> development purposes.
>>>
>>> The rote code that the schemas were replacing would be able to be
>>> cobbled together into efficient DoFn and CombineFns for serialization. At
>>> present, it seems like it could be implemented as a side package that uses
>>> beam, rather than changing portions of the core beam Go packages, The real
>>> trick would be to do so without "apply" since that's not how the Go SDK is
>>> shaped.
>>>
>>>
>>>
>>>
>>> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <gl...@spotify.com> wrote:
>>>
>>>> Reuven, it sounds great. I see there is a similar thing to Row coders
>>>> happening in Apache Arrow <https://arrow.apache.org>, and there is a
>>>> similarity between Apache Arrow Flight
>>>> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
>>>> and data exchange service in portability. How do you see these two things
>>>> relate to each other in the long term?
>>>>
>>>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> The biggest advantage is actually readability and usability. A
>>>>> secondary advantage is that it means that Go will be able to interact
>>>>> seamlessly with BeamSQL, which would be a big win for Go.
>>>>>
>>>>> A schema is basically a way of saying that a record has a specific set
>>>>> of (possibly nested, possibly repeated) fields. So for instance let's say
>>>>> that the user's type is a struct with fields named user, country,
>>>>> purchaseCost. This allows us to provide transforms that operate on field
>>>>> names. Some example (using the Java API):
>>>>>
>>>>> PCollection users = events.apply(Select.fields("user"));  // Select
>>>>> out only the user field.
>>>>>
>>>>> PCollection joinedEvents =
>>>>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>>>>> PCollections by user.
>>>>>
>>>>> // For each country, calculate the total purchase cost as well as the
>>>>> top 10 purchases.
>>>>> // A new schema is created containing fields total_cost and
>>>>> top_purchases, and rows are created with the aggregation results.
>>>>> PCollection purchaseStatistics = events.apply(
>>>>>     Group.byFieldNames("country")
>>>>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>>>>> "total_cost"))
>>>>>                 .aggregateField("purchaseCost", Top.largestLongs(10),
>>>>> "top_purchases"))
>>>>>
>>>>>
>>>>> This is far more readable than what we have today, and what unlocks
>>>>> this is that Beam actually knows the structure of the record instead of
>>>>> assuming records are uncrackable blobs.
>>>>>
>>>>> Note that a coder is basically a special case of a schema that has a
>>>>> single field.
>>>>>
>>>>> In BeamJava we have a SchemaRegistry which knows how to turn user
>>>>> types into schemas. We use reflection to analyze many user types (e.g.
>>>>> simple POJO structs, JavaBean classes, Avro records, protocol buffers,
>>>>> etc.) to determine the schema, however this is done only when the graph is
>>>>> initially generated. We do use code generation (in Java we do bytecode
>>>>> generation) to make this somewhat more efficient. I'm willing to bet that
>>>>> the code generator you've written for structs could be very easily modified
>>>>> for schemas instead, so it would not be wasted work if we went with schemas.
>>>>>
>>>>> One of the things I'm working on now is documenting Beam schemas. They
>>>>> are already very powerful and useful, but since there is still nothing in
>>>>> our documentation about them, they are not yet widely used. I expect to
>>>>> finish draft documentation by the end of January.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>>>>>
>>>>>> That's an interesting idea. I must confess I don't rightly know the
>>>>>> difference between a schema and coder, but here's what I've got with a bit
>>>>>> of searching through memory and the mailing list. Please let me know if I'm
>>>>>> off track.
>>>>>>
>>>>>> As near as I can tell, a schema, as far as Beam takes it
>>>>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
>>>>>> a mechanism to define what data is extracted from a given row of data. So
>>>>>> in principle, there's an opportunity to be more efficient with data with
>>>>>> many columns that aren't being used, and only extract the data that's
>>>>>> meaningful to the pipeline.
>>>>>> The trick then is how to apply the schema to a given serialization
>>>>>> format, which is something I'm missing in my mental model (and then how to
>>>>>> do it efficiently in Go).
>>>>>>
>>>>>> I do know that the Go client package for BigQuery
>>>>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>>>>>> something like that, using field tags. Similarly, the "encoding/json"
>>>>>> <https://golang.org/doc/articles/json_and_go.html> package in the Go
>>>>>> Standard Library permits annotating fields and it will read out and
>>>>>> deserialize the JSON fields and that's it.
>>>>>>
>>>>>> A concern I have is that Go (at present) would require pre-compile
>>>>>> time code generation for schemas to be efficient, and they would still
>>>>>> mostly boil down to turning []bytes into real structs. Go reflection
>>>>>> doesn't keep up.
>>>>>> Go has no mechanism I'm aware of to Just In Time compile more
>>>>>> efficient processing of values.
>>>>>> It's also not 100% clear how Schema's would play with protocol
>>>>>> buffers or similar.
>>>>>> BigQuery has a mechanism of generating a JSON schema from a proto file
>>>>>> <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, but
>>>>>> that's only the specification half, not the using half.
>>>>>>
>>>>>> As it stands, the code generator I've been building these last months
>>>>>> could (in principle) statically analyze a user's struct, and then generate
>>>>>> an efficient dedicated coder for it. It just has no where to put them such
>>>>>> that the Go SDK would use it.
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> I'll make a different suggestion. There's been some chatter that
>>>>>>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>>>>>>> schemas the basic semantics instead of coders. Schemas provide everything a
>>>>>>> coder provides, but also allows for far more readable code. We can't make
>>>>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>>>>>>> we're better off starting with schemas instead of coders?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> One area that the Go SDK currently lacks: is the ability for users
>>>>>>>> to specify their own coders for types.
>>>>>>>>
>>>>>>>> I've written a proposal document,
>>>>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>>>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>>>>> that require discussion before getting on with the implementation.
>>>>>>>>
>>>>>>>> At presently, the SDK only permits primitive value types (all
>>>>>>>> numeric types but complex, strings, and []bytes) which are coded with beam
>>>>>>>> coders, and structs whose exported fields are of those type, which is then
>>>>>>>> encoded as JSON. Protocol buffer support is hacked in to avoid the type
>>>>>>>> anaiyzer, and presents the current work around this issue.
>>>>>>>>
>>>>>>>> The high level proposal is to catch up with Python and Java, and
>>>>>>>> have a coder registry. In addition, arrays, and maps should be permitted as
>>>>>>>> well.
>>>>>>>>
>>>>>>>> If you have alternatives, or other suggestions and opinions, I'd
>>>>>>>> love to hear them! Otherwise my intent is to get a PR ready by the end of
>>>>>>>> January.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> Robert Burke
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> http://go/where-is-rebo
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> Gleb
>>>>
>>>

Re: [Go SDK] User Defined Coders

Posted by Andrew Pilloud <ap...@google.com>.
+1 on this. I think we are somewhat lacking on a written design for Schemas
in Java. This would be really useful in driving adoption and expanding to
other languages.

Andrew

On Mon, Jan 7, 2019 at 3:43 PM Robert Burke <ro...@frantil.com> wrote:

> Might I see the design doc (not code) for how they're supposed to look and
> work in Java first? I'd rather not write a document based on a speculative
> understanding of Schemas based on the littany of assumptions I'm making
> about them.
>
>
> On Mon, Jan 7, 2019, 2:35 PM Reuven Lax <re...@google.com> wrote:
>
>> I suggest that we write out a design of what schemas in go would look
>> like and how it would interact with coders. We'll then be in a much better
>> position to decide what the right short-term path forward is. Even if we
>> decide it makes more sense to build up the coder support first, I think
>> this will guide us; e.g. we can build up the coder support in a way that
>> can be extended to full schemas later.
>>
>> Writing up an overview design shouldn't take too much time and I think is
>> definitely worth it.
>>
>> Reuven
>>
>> On Mon, Jan 7, 2019 at 2:12 PM Robert Burke <ro...@frantil.com> wrote:
>>
>>> Kenn has pointed out to me that Coders are not likely going to vanish in
>>> the next  while, in particular over the FnAPI, so having a coder registry
>>> does remain useful, as described by an early adopter in another thread.
>>>
>>> On Fri, Jan 4, 2019, 10:51 AM Robert Burke <ro...@frantil.com> wrote:
>>>
>>>> I think you're right Kenn.
>>>>
>>>> Reuven alluded to the difficulty in inference of what to use between
>>>> AtomicType and the rest, in particular Struct<Schema>.
>>>>
>>>> Go has the additional concerns around Pointer vs Non Pointer types
>>>> which isn't a concern either Python or Java have, but has implications on
>>>> pipeline efficiency that need addressing, in particular, being able to use
>>>> them in a useful fashion in the Go SDK.
>>>>
>>>> I agree that long term, having schemas as a default codec would be
>>>> hugely beneficial for readability, composability, and allows more
>>>> processing to be on the Runner Harness side of a worker. (I'll save the
>>>> rest of my thoughts on Schemas in Go for the other thread, and say no more
>>>> of it here.)
>>>>
>>>> *Regarding my proposal for User Defined Coders:*
>>>>
>>>> To avoid users accidentally preventing themselves from using Schemas in
>>>> the future, I need to remove the ability to override the default coder *(4).
>>>> *Then instead of JSON coding by default *(5)*, the SDK should be doing
>>>> Schema coding. The SDK is already doing the recursive type analysis on
>>>> types at pipeline construction time, so it's not a huge stretch to support
>>>> Schemas using that information in the future, once Runner & FnAPI support
>>>> begins to exist.
>>>>
>>>> *(1)* doesn't seem to need changing, as this is the existing
>>>> AtomicType definition Kenn pointed out.
>>>>
>>>> *(2)* is the specific AtomicType override.
>>>>
>>>> *(3) *is the broader Go specific override for Go's unique interface
>>>> semantics. This most of the cases *(4)* would have covered anyway, but
>>>> in a targeted way.
>>>>
>>>> This should still allow Go users to better control their pipeline, and
>>>> associated performance implications (which is my goal in this change),
>>>> while not making an overall incompatible choice for powerful beam features
>>>> for the common case in the future.
>>>>
>>>> Does that sound right?
>>>>
>>>> On Fri, 4 Jan 2019 at 10:05 Kenneth Knowles <ke...@apache.org> wrote:
>>>>
>>>>> On Thu, Jan 3, 2019 at 4:33 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> If a user wants custom encoding for a primitive type, they can create
>>>>>> a byte-array field and wrap that field with a Coder
>>>>>>
>>>>>
>>>>> This is the crux of the issue, right?
>>>>>
>>>>> Roughly, today, we've got:
>>>>>
>>>>>         Schema ::= [ (fieldname, Type) ]
>>>>>
>>>>>         Type ::= AtomicType | Array<Type> | Map<Type, Type> |
>>>>> Struct<Schema>
>>>>>
>>>>>         AtomicType ::= bytes | int{16, 32, 64} | datetime | string |
>>>>> ...
>>>>>
>>>>> To fully replace custom encodings as they exist, you need:
>>>>>
>>>>>         AtomicType ::= bytes<CustomCoder> | ...
>>>>>
>>>>> At this point, an SDK need not surface the concept of "Coder" to a
>>>>> user at all outside the bytes field concept and the wire encoding and
>>>>> efficient should be identical or nearly to what we do with coders today.
>>>>> PCollections in such an SDK have schemas, not coders, so we have
>>>>> successfully turned it completely inside-out relative to how the Java SDK
>>>>> does it. Is that what you have in mind?
>>>>>
>>>>> I really like this, but I agree with Robert that this is a major
>>>>> change that takes a bunch of work and a lot more collaborative thinking in
>>>>> design docs if we hope to get it right/stable.
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>> (this is why I said that todays Coders are simply special cases);
>>>>>> this should be very rare though, as users rarely should care how Beam
>>>>>> encodes a long or a double.
>>>>>>
>>>>>>>
>>>>>>> Offhand, Schemas seem to be an alternative to pipeline construction,
>>>>>>> rather than coders for value serialization, allowing manual field
>>>>>>> extraction code to be omitted. They do not appear to be a fundamental
>>>>>>> approach to achieve it. For example, the grouping operation still needs to
>>>>>>> encode the whole of the object as a value.
>>>>>>>
>>>>>>
>>>>>> Schemas are properties of the data - essentially a Schema is the data
>>>>>> type of a PCollection. In Java Schemas are also understood by ParDo, so you
>>>>>> can write a ParDo like this:
>>>>>>
>>>>>> @ProcessElement
>>>>>> public void process(@Field("user") String userId,  @Field("country")
>>>>>> String countryCode) {
>>>>>> }
>>>>>>
>>>>>> These extra functionalities are part of the graph, but they are
>>>>>> enabled by schemas.
>>>>>>
>>>>>>>
>>>>>>> As mentioned, I'm hoping to have a solution for existing coders by
>>>>>>> January's end, so waiting for your documentation doesn't work on that
>>>>>>> timeline.
>>>>>>>
>>>>>>
>>>>>> I don't think we need to wait for all the documentation to be
>>>>>> written.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> That said, they aren't incompatible ideas as demonstrated by the
>>>>>>> Java implementation. The Go SDK remains in an experimental state. We can
>>>>>>> change things should the need arise in the next few months. Further,
>>>>>>> whenever Generics in Go
>>>>>>> <https://go.googlesource.com/proposal/+/master/design/go2draft-generics-overview.md>
>>>>>>> crop up, the existing user surface and execution stack will need to be
>>>>>>> re-written to take advantage of them anyway. That provides an opportunity
>>>>>>> to invert Coder vs Schema dependence while getting a nice performance
>>>>>>> boost, and cleaner code (and deleting much of my code generator).
>>>>>>>
>>>>>>> ----
>>>>>>>
>>>>>>> Were I to implement schemas to get the same syntatic benefits as the
>>>>>>> Java API, I'd be leveraging the field annotations Go has. This satisfies
>>>>>>> the protocol buffer issue as well, since generated go protos have name &
>>>>>>> json annotations. Schemas could be extracted that way. These are also
>>>>>>> available to anything using static analysis for more direct generation of
>>>>>>> accessors. The reflective approach would also work, which is excellent for
>>>>>>> development purposes.
>>>>>>>
>>>>>>> The rote code that the schemas were replacing would be able to be
>>>>>>> cobbled together into efficient DoFn and CombineFns for serialization. At
>>>>>>> present, it seems like it could be implemented as a side package that uses
>>>>>>> beam, rather than changing portions of the core beam Go packages, The real
>>>>>>> trick would be to do so without "apply" since that's not how the Go SDK is
>>>>>>> shaped.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <gl...@spotify.com> wrote:
>>>>>>>
>>>>>>>> Reuven, it sounds great. I see there is a similar thing to Row
>>>>>>>> coders happening in Apache Arrow <https://arrow.apache.org>, and
>>>>>>>> there is a similarity between Apache Arrow Flight
>>>>>>>> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
>>>>>>>> and data exchange service in portability. How do you see these two things
>>>>>>>> relate to each other in the long term?
>>>>>>>>
>>>>>>>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> The biggest advantage is actually readability and usability. A
>>>>>>>>> secondary advantage is that it means that Go will be able to interact
>>>>>>>>> seamlessly with BeamSQL, which would be a big win for Go.
>>>>>>>>>
>>>>>>>>> A schema is basically a way of saying that a record has a specific
>>>>>>>>> set of (possibly nested, possibly repeated) fields. So for instance let's
>>>>>>>>> say that the user's type is a struct with fields named user, country,
>>>>>>>>> purchaseCost. This allows us to provide transforms that operate on field
>>>>>>>>> names. Some example (using the Java API):
>>>>>>>>>
>>>>>>>>> PCollection users = events.apply(Select.fields("user"));  //
>>>>>>>>> Select out only the user field.
>>>>>>>>>
>>>>>>>>> PCollection joinedEvents =
>>>>>>>>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>>>>>>>>> PCollections by user.
>>>>>>>>>
>>>>>>>>> // For each country, calculate the total purchase cost as well as
>>>>>>>>> the top 10 purchases.
>>>>>>>>> // A new schema is created containing fields total_cost and
>>>>>>>>> top_purchases, and rows are created with the aggregation results.
>>>>>>>>> PCollection purchaseStatistics = events.apply(
>>>>>>>>>     Group.byFieldNames("country")
>>>>>>>>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>>>>>>>>> "total_cost"))
>>>>>>>>>                 .aggregateField("purchaseCost",
>>>>>>>>> Top.largestLongs(10), "top_purchases"))
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is far more readable than what we have today, and what
>>>>>>>>> unlocks this is that Beam actually knows the structure of the record
>>>>>>>>> instead of assuming records are uncrackable blobs.
>>>>>>>>>
>>>>>>>>> Note that a coder is basically a special case of a schema that has
>>>>>>>>> a single field.
>>>>>>>>>
>>>>>>>>> In BeamJava we have a SchemaRegistry which knows how to turn user
>>>>>>>>> types into schemas. We use reflection to analyze many user types (e.g.
>>>>>>>>> simple POJO structs, JavaBean classes, Avro records, protocol buffers,
>>>>>>>>> etc.) to determine the schema, however this is done only when the graph is
>>>>>>>>> initially generated. We do use code generation (in Java we do bytecode
>>>>>>>>> generation) to make this somewhat more efficient. I'm willing to bet that
>>>>>>>>> the code generator you've written for structs could be very easily modified
>>>>>>>>> for schemas instead, so it would not be wasted work if we went with schemas.
>>>>>>>>>
>>>>>>>>> One of the things I'm working on now is documenting Beam schemas.
>>>>>>>>> They are already very powerful and useful, but since there is still nothing
>>>>>>>>> in our documentation about them, they are not yet widely used. I expect to
>>>>>>>>> finish draft documentation by the end of January.
>>>>>>>>>
>>>>>>>>> Reuven
>>>>>>>>>
>>>>>>>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> That's an interesting idea. I must confess I don't rightly know
>>>>>>>>>> the difference between a schema and coder, but here's what I've got with a
>>>>>>>>>> bit of searching through memory and the mailing list. Please let me know if
>>>>>>>>>> I'm off track.
>>>>>>>>>>
>>>>>>>>>> As near as I can tell, a schema, as far as Beam takes it
>>>>>>>>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
>>>>>>>>>> a mechanism to define what data is extracted from a given row of data. So
>>>>>>>>>> in principle, there's an opportunity to be more efficient with data with
>>>>>>>>>> many columns that aren't being used, and only extract the data that's
>>>>>>>>>> meaningful to the pipeline.
>>>>>>>>>> The trick then is how to apply the schema to a given
>>>>>>>>>> serialization format, which is something I'm missing in my mental model
>>>>>>>>>> (and then how to do it efficiently in Go).
>>>>>>>>>>
>>>>>>>>>> I do know that the Go client package for BigQuery
>>>>>>>>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas>
>>>>>>>>>> does something like that, using field tags. Similarly, the
>>>>>>>>>> "encoding/json"
>>>>>>>>>> <https://golang.org/doc/articles/json_and_go.html> package in
>>>>>>>>>> the Go Standard Library permits annotating fields and it will read out and
>>>>>>>>>> deserialize the JSON fields and that's it.
>>>>>>>>>>
>>>>>>>>>> A concern I have is that Go (at present) would require
>>>>>>>>>> pre-compile time code generation for schemas to be efficient, and they
>>>>>>>>>> would still mostly boil down to turning []bytes into real structs. Go
>>>>>>>>>> reflection doesn't keep up.
>>>>>>>>>> Go has no mechanism I'm aware of to Just In Time compile more
>>>>>>>>>> efficient processing of values.
>>>>>>>>>> It's also not 100% clear how Schema's would play with protocol
>>>>>>>>>> buffers or similar.
>>>>>>>>>> BigQuery has a mechanism of generating a JSON schema from a proto
>>>>>>>>>> file
>>>>>>>>>> <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>,
>>>>>>>>>> but that's only the specification half, not the using half.
>>>>>>>>>>
>>>>>>>>>> As it stands, the code generator I've been building these last
>>>>>>>>>> months could (in principle) statically analyze a user's struct, and then
>>>>>>>>>> generate an efficient dedicated coder for it. It just has no where to put
>>>>>>>>>> them such that the Go SDK would use it.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'll make a different suggestion. There's been some chatter that
>>>>>>>>>>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>>>>>>>>>>> schemas the basic semantics instead of coders. Schemas provide everything a
>>>>>>>>>>> coder provides, but also allows for far more readable code. We can't make
>>>>>>>>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>>>>>>>>>>> we're better off starting with schemas instead of coders?
>>>>>>>>>>>
>>>>>>>>>>> Reuven
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> One area that the Go SDK currently lacks: is the ability for
>>>>>>>>>>>> users to specify their own coders for types.
>>>>>>>>>>>>
>>>>>>>>>>>> I've written a proposal document,
>>>>>>>>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>>>>>>>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>>>>>>>>> that require discussion before getting on with the implementation.
>>>>>>>>>>>>
>>>>>>>>>>>> At presently, the SDK only permits primitive value types (all
>>>>>>>>>>>> numeric types but complex, strings, and []bytes) which are coded with beam
>>>>>>>>>>>> coders, and structs whose exported fields are of those type, which is then
>>>>>>>>>>>> encoded as JSON. Protocol buffer support is hacked in to avoid the type
>>>>>>>>>>>> anaiyzer, and presents the current work around this issue.
>>>>>>>>>>>>
>>>>>>>>>>>> The high level proposal is to catch up with Python and Java,
>>>>>>>>>>>> and have a coder registry. In addition, arrays, and maps should be
>>>>>>>>>>>> permitted as well.
>>>>>>>>>>>>
>>>>>>>>>>>> If you have alternatives, or other suggestions and opinions,
>>>>>>>>>>>> I'd love to hear them! Otherwise my intent is to get a PR ready by the end
>>>>>>>>>>>> of January.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>> Robert Burke
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> http://go/where-is-rebo
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Cheers,
>>>>>>>> Gleb
>>>>>>>>
>>>>>>>

Re: [Go SDK] User Defined Coders

Posted by Robert Burke <ro...@frantil.com>.
Might I see the design doc (not code) for how they're supposed to look and
work in Java first? I'd rather not write a document based on a speculative
understanding of Schemas based on the littany of assumptions I'm making
about them.


On Mon, Jan 7, 2019, 2:35 PM Reuven Lax <re...@google.com> wrote:

> I suggest that we write out a design of what schemas in go would look like
> and how it would interact with coders. We'll then be in a much better
> position to decide what the right short-term path forward is. Even if we
> decide it makes more sense to build up the coder support first, I think
> this will guide us; e.g. we can build up the coder support in a way that
> can be extended to full schemas later.
>
> Writing up an overview design shouldn't take too much time and I think is
> definitely worth it.
>
> Reuven
>
> On Mon, Jan 7, 2019 at 2:12 PM Robert Burke <ro...@frantil.com> wrote:
>
>> Kenn has pointed out to me that Coders are not likely going to vanish in
>> the next  while, in particular over the FnAPI, so having a coder registry
>> does remain useful, as described by an early adopter in another thread.
>>
>> On Fri, Jan 4, 2019, 10:51 AM Robert Burke <ro...@frantil.com> wrote:
>>
>>> I think you're right Kenn.
>>>
>>> Reuven alluded to the difficulty in inference of what to use between
>>> AtomicType and the rest, in particular Struct<Schema>.
>>>
>>> Go has the additional concerns around Pointer vs Non Pointer types which
>>> isn't a concern either Python or Java have, but has implications on
>>> pipeline efficiency that need addressing, in particular, being able to use
>>> them in a useful fashion in the Go SDK.
>>>
>>> I agree that long term, having schemas as a default codec would be
>>> hugely beneficial for readability, composability, and allows more
>>> processing to be on the Runner Harness side of a worker. (I'll save the
>>> rest of my thoughts on Schemas in Go for the other thread, and say no more
>>> of it here.)
>>>
>>> *Regarding my proposal for User Defined Coders:*
>>>
>>> To avoid users accidentally preventing themselves from using Schemas in
>>> the future, I need to remove the ability to override the default coder *(4).
>>> *Then instead of JSON coding by default *(5)*, the SDK should be doing
>>> Schema coding. The SDK is already doing the recursive type analysis on
>>> types at pipeline construction time, so it's not a huge stretch to support
>>> Schemas using that information in the future, once Runner & FnAPI support
>>> begins to exist.
>>>
>>> *(1)* doesn't seem to need changing, as this is the existing AtomicType
>>> definition Kenn pointed out.
>>>
>>> *(2)* is the specific AtomicType override.
>>>
>>> *(3) *is the broader Go specific override for Go's unique interface
>>> semantics. This most of the cases *(4)* would have covered anyway, but
>>> in a targeted way.
>>>
>>> This should still allow Go users to better control their pipeline, and
>>> associated performance implications (which is my goal in this change),
>>> while not making an overall incompatible choice for powerful beam features
>>> for the common case in the future.
>>>
>>> Does that sound right?
>>>
>>> On Fri, 4 Jan 2019 at 10:05 Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>> On Thu, Jan 3, 2019 at 4:33 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> If a user wants custom encoding for a primitive type, they can create
>>>>> a byte-array field and wrap that field with a Coder
>>>>>
>>>>
>>>> This is the crux of the issue, right?
>>>>
>>>> Roughly, today, we've got:
>>>>
>>>>         Schema ::= [ (fieldname, Type) ]
>>>>
>>>>         Type ::= AtomicType | Array<Type> | Map<Type, Type> |
>>>> Struct<Schema>
>>>>
>>>>         AtomicType ::= bytes | int{16, 32, 64} | datetime | string | ...
>>>>
>>>> To fully replace custom encodings as they exist, you need:
>>>>
>>>>         AtomicType ::= bytes<CustomCoder> | ...
>>>>
>>>> At this point, an SDK need not surface the concept of "Coder" to a user
>>>> at all outside the bytes field concept and the wire encoding and efficient
>>>> should be identical or nearly to what we do with coders today. PCollections
>>>> in such an SDK have schemas, not coders, so we have successfully turned it
>>>> completely inside-out relative to how the Java SDK does it. Is that what
>>>> you have in mind?
>>>>
>>>> I really like this, but I agree with Robert that this is a major change
>>>> that takes a bunch of work and a lot more collaborative thinking in design
>>>> docs if we hope to get it right/stable.
>>>>
>>>> Kenn
>>>>
>>>>
>>>>> (this is why I said that todays Coders are simply special cases); this
>>>>> should be very rare though, as users rarely should care how Beam encodes a
>>>>> long or a double.
>>>>>
>>>>>>
>>>>>> Offhand, Schemas seem to be an alternative to pipeline construction,
>>>>>> rather than coders for value serialization, allowing manual field
>>>>>> extraction code to be omitted. They do not appear to be a fundamental
>>>>>> approach to achieve it. For example, the grouping operation still needs to
>>>>>> encode the whole of the object as a value.
>>>>>>
>>>>>
>>>>> Schemas are properties of the data - essentially a Schema is the data
>>>>> type of a PCollection. In Java Schemas are also understood by ParDo, so you
>>>>> can write a ParDo like this:
>>>>>
>>>>> @ProcessElement
>>>>> public void process(@Field("user") String userId,  @Field("country")
>>>>> String countryCode) {
>>>>> }
>>>>>
>>>>> These extra functionalities are part of the graph, but they are
>>>>> enabled by schemas.
>>>>>
>>>>>>
>>>>>> As mentioned, I'm hoping to have a solution for existing coders by
>>>>>> January's end, so waiting for your documentation doesn't work on that
>>>>>> timeline.
>>>>>>
>>>>>
>>>>> I don't think we need to wait for all the documentation to be written.
>>>>>
>>>>>
>>>>>>
>>>>>> That said, they aren't incompatible ideas as demonstrated by the Java
>>>>>> implementation. The Go SDK remains in an experimental state. We can change
>>>>>> things should the need arise in the next few months. Further, whenever Generics
>>>>>> in Go
>>>>>> <https://go.googlesource.com/proposal/+/master/design/go2draft-generics-overview.md>
>>>>>> crop up, the existing user surface and execution stack will need to be
>>>>>> re-written to take advantage of them anyway. That provides an opportunity
>>>>>> to invert Coder vs Schema dependence while getting a nice performance
>>>>>> boost, and cleaner code (and deleting much of my code generator).
>>>>>>
>>>>>> ----
>>>>>>
>>>>>> Were I to implement schemas to get the same syntatic benefits as the
>>>>>> Java API, I'd be leveraging the field annotations Go has. This satisfies
>>>>>> the protocol buffer issue as well, since generated go protos have name &
>>>>>> json annotations. Schemas could be extracted that way. These are also
>>>>>> available to anything using static analysis for more direct generation of
>>>>>> accessors. The reflective approach would also work, which is excellent for
>>>>>> development purposes.
>>>>>>
>>>>>> The rote code that the schemas were replacing would be able to be
>>>>>> cobbled together into efficient DoFn and CombineFns for serialization. At
>>>>>> present, it seems like it could be implemented as a side package that uses
>>>>>> beam, rather than changing portions of the core beam Go packages, The real
>>>>>> trick would be to do so without "apply" since that's not how the Go SDK is
>>>>>> shaped.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <gl...@spotify.com> wrote:
>>>>>>
>>>>>>> Reuven, it sounds great. I see there is a similar thing to Row
>>>>>>> coders happening in Apache Arrow <https://arrow.apache.org>, and
>>>>>>> there is a similarity between Apache Arrow Flight
>>>>>>> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
>>>>>>> and data exchange service in portability. How do you see these two things
>>>>>>> relate to each other in the long term?
>>>>>>>
>>>>>>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> The biggest advantage is actually readability and usability. A
>>>>>>>> secondary advantage is that it means that Go will be able to interact
>>>>>>>> seamlessly with BeamSQL, which would be a big win for Go.
>>>>>>>>
>>>>>>>> A schema is basically a way of saying that a record has a specific
>>>>>>>> set of (possibly nested, possibly repeated) fields. So for instance let's
>>>>>>>> say that the user's type is a struct with fields named user, country,
>>>>>>>> purchaseCost. This allows us to provide transforms that operate on field
>>>>>>>> names. Some example (using the Java API):
>>>>>>>>
>>>>>>>> PCollection users = events.apply(Select.fields("user"));  // Select
>>>>>>>> out only the user field.
>>>>>>>>
>>>>>>>> PCollection joinedEvents =
>>>>>>>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>>>>>>>> PCollections by user.
>>>>>>>>
>>>>>>>> // For each country, calculate the total purchase cost as well as
>>>>>>>> the top 10 purchases.
>>>>>>>> // A new schema is created containing fields total_cost and
>>>>>>>> top_purchases, and rows are created with the aggregation results.
>>>>>>>> PCollection purchaseStatistics = events.apply(
>>>>>>>>     Group.byFieldNames("country")
>>>>>>>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>>>>>>>> "total_cost"))
>>>>>>>>                 .aggregateField("purchaseCost",
>>>>>>>> Top.largestLongs(10), "top_purchases"))
>>>>>>>>
>>>>>>>>
>>>>>>>> This is far more readable than what we have today, and what unlocks
>>>>>>>> this is that Beam actually knows the structure of the record instead of
>>>>>>>> assuming records are uncrackable blobs.
>>>>>>>>
>>>>>>>> Note that a coder is basically a special case of a schema that has
>>>>>>>> a single field.
>>>>>>>>
>>>>>>>> In BeamJava we have a SchemaRegistry which knows how to turn user
>>>>>>>> types into schemas. We use reflection to analyze many user types (e.g.
>>>>>>>> simple POJO structs, JavaBean classes, Avro records, protocol buffers,
>>>>>>>> etc.) to determine the schema, however this is done only when the graph is
>>>>>>>> initially generated. We do use code generation (in Java we do bytecode
>>>>>>>> generation) to make this somewhat more efficient. I'm willing to bet that
>>>>>>>> the code generator you've written for structs could be very easily modified
>>>>>>>> for schemas instead, so it would not be wasted work if we went with schemas.
>>>>>>>>
>>>>>>>> One of the things I'm working on now is documenting Beam schemas.
>>>>>>>> They are already very powerful and useful, but since there is still nothing
>>>>>>>> in our documentation about them, they are not yet widely used. I expect to
>>>>>>>> finish draft documentation by the end of January.
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> That's an interesting idea. I must confess I don't rightly know
>>>>>>>>> the difference between a schema and coder, but here's what I've got with a
>>>>>>>>> bit of searching through memory and the mailing list. Please let me know if
>>>>>>>>> I'm off track.
>>>>>>>>>
>>>>>>>>> As near as I can tell, a schema, as far as Beam takes it
>>>>>>>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
>>>>>>>>> a mechanism to define what data is extracted from a given row of data. So
>>>>>>>>> in principle, there's an opportunity to be more efficient with data with
>>>>>>>>> many columns that aren't being used, and only extract the data that's
>>>>>>>>> meaningful to the pipeline.
>>>>>>>>> The trick then is how to apply the schema to a given serialization
>>>>>>>>> format, which is something I'm missing in my mental model (and then how to
>>>>>>>>> do it efficiently in Go).
>>>>>>>>>
>>>>>>>>> I do know that the Go client package for BigQuery
>>>>>>>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>>>>>>>>> something like that, using field tags. Similarly, the
>>>>>>>>> "encoding/json" <https://golang.org/doc/articles/json_and_go.html> package
>>>>>>>>> in the Go Standard Library permits annotating fields and it will read out
>>>>>>>>> and deserialize the JSON fields and that's it.
>>>>>>>>>
>>>>>>>>> A concern I have is that Go (at present) would require pre-compile
>>>>>>>>> time code generation for schemas to be efficient, and they would still
>>>>>>>>> mostly boil down to turning []bytes into real structs. Go reflection
>>>>>>>>> doesn't keep up.
>>>>>>>>> Go has no mechanism I'm aware of to Just In Time compile more
>>>>>>>>> efficient processing of values.
>>>>>>>>> It's also not 100% clear how Schema's would play with protocol
>>>>>>>>> buffers or similar.
>>>>>>>>> BigQuery has a mechanism of generating a JSON schema from a proto
>>>>>>>>> file <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>,
>>>>>>>>> but that's only the specification half, not the using half.
>>>>>>>>>
>>>>>>>>> As it stands, the code generator I've been building these last
>>>>>>>>> months could (in principle) statically analyze a user's struct, and then
>>>>>>>>> generate an efficient dedicated coder for it. It just has no where to put
>>>>>>>>> them such that the Go SDK would use it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I'll make a different suggestion. There's been some chatter that
>>>>>>>>>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>>>>>>>>>> schemas the basic semantics instead of coders. Schemas provide everything a
>>>>>>>>>> coder provides, but also allows for far more readable code. We can't make
>>>>>>>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>>>>>>>>>> we're better off starting with schemas instead of coders?
>>>>>>>>>>
>>>>>>>>>> Reuven
>>>>>>>>>>
>>>>>>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> One area that the Go SDK currently lacks: is the ability for
>>>>>>>>>>> users to specify their own coders for types.
>>>>>>>>>>>
>>>>>>>>>>> I've written a proposal document,
>>>>>>>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>>>>>>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>>>>>>>> that require discussion before getting on with the implementation.
>>>>>>>>>>>
>>>>>>>>>>> At presently, the SDK only permits primitive value types (all
>>>>>>>>>>> numeric types but complex, strings, and []bytes) which are coded with beam
>>>>>>>>>>> coders, and structs whose exported fields are of those type, which is then
>>>>>>>>>>> encoded as JSON. Protocol buffer support is hacked in to avoid the type
>>>>>>>>>>> anaiyzer, and presents the current work around this issue.
>>>>>>>>>>>
>>>>>>>>>>> The high level proposal is to catch up with Python and Java, and
>>>>>>>>>>> have a coder registry. In addition, arrays, and maps should be permitted as
>>>>>>>>>>> well.
>>>>>>>>>>>
>>>>>>>>>>> If you have alternatives, or other suggestions and opinions, I'd
>>>>>>>>>>> love to hear them! Otherwise my intent is to get a PR ready by the end of
>>>>>>>>>>> January.
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>> Robert Burke
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> http://go/where-is-rebo
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Cheers,
>>>>>>> Gleb
>>>>>>>
>>>>>>

Re: [Go SDK] User Defined Coders

Posted by Reuven Lax <re...@google.com>.
I suggest that we write out a design of what schemas in go would look like
and how it would interact with coders. We'll then be in a much better
position to decide what the right short-term path forward is. Even if we
decide it makes more sense to build up the coder support first, I think
this will guide us; e.g. we can build up the coder support in a way that
can be extended to full schemas later.

Writing up an overview design shouldn't take too much time and I think is
definitely worth it.

Reuven

On Mon, Jan 7, 2019 at 2:12 PM Robert Burke <ro...@frantil.com> wrote:

> Kenn has pointed out to me that Coders are not likely going to vanish in
> the next  while, in particular over the FnAPI, so having a coder registry
> does remain useful, as described by an early adopter in another thread.
>
> On Fri, Jan 4, 2019, 10:51 AM Robert Burke <ro...@frantil.com> wrote:
>
>> I think you're right Kenn.
>>
>> Reuven alluded to the difficulty in inference of what to use between
>> AtomicType and the rest, in particular Struct<Schema>.
>>
>> Go has the additional concerns around Pointer vs Non Pointer types which
>> isn't a concern either Python or Java have, but has implications on
>> pipeline efficiency that need addressing, in particular, being able to use
>> them in a useful fashion in the Go SDK.
>>
>> I agree that long term, having schemas as a default codec would be hugely
>> beneficial for readability, composability, and allows more processing to be
>> on the Runner Harness side of a worker. (I'll save the rest of my thoughts
>> on Schemas in Go for the other thread, and say no more of it here.)
>>
>> *Regarding my proposal for User Defined Coders:*
>>
>> To avoid users accidentally preventing themselves from using Schemas in
>> the future, I need to remove the ability to override the default coder *(4).
>> *Then instead of JSON coding by default *(5)*, the SDK should be doing
>> Schema coding. The SDK is already doing the recursive type analysis on
>> types at pipeline construction time, so it's not a huge stretch to support
>> Schemas using that information in the future, once Runner & FnAPI support
>> begins to exist.
>>
>> *(1)* doesn't seem to need changing, as this is the existing AtomicType
>> definition Kenn pointed out.
>>
>> *(2)* is the specific AtomicType override.
>>
>> *(3) *is the broader Go specific override for Go's unique interface
>> semantics. This most of the cases *(4)* would have covered anyway, but
>> in a targeted way.
>>
>> This should still allow Go users to better control their pipeline, and
>> associated performance implications (which is my goal in this change),
>> while not making an overall incompatible choice for powerful beam features
>> for the common case in the future.
>>
>> Does that sound right?
>>
>> On Fri, 4 Jan 2019 at 10:05 Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> On Thu, Jan 3, 2019 at 4:33 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> If a user wants custom encoding for a primitive type, they can create a
>>>> byte-array field and wrap that field with a Coder
>>>>
>>>
>>> This is the crux of the issue, right?
>>>
>>> Roughly, today, we've got:
>>>
>>>         Schema ::= [ (fieldname, Type) ]
>>>
>>>         Type ::= AtomicType | Array<Type> | Map<Type, Type> |
>>> Struct<Schema>
>>>
>>>         AtomicType ::= bytes | int{16, 32, 64} | datetime | string | ...
>>>
>>> To fully replace custom encodings as they exist, you need:
>>>
>>>         AtomicType ::= bytes<CustomCoder> | ...
>>>
>>> At this point, an SDK need not surface the concept of "Coder" to a user
>>> at all outside the bytes field concept and the wire encoding and efficient
>>> should be identical or nearly to what we do with coders today. PCollections
>>> in such an SDK have schemas, not coders, so we have successfully turned it
>>> completely inside-out relative to how the Java SDK does it. Is that what
>>> you have in mind?
>>>
>>> I really like this, but I agree with Robert that this is a major change
>>> that takes a bunch of work and a lot more collaborative thinking in design
>>> docs if we hope to get it right/stable.
>>>
>>> Kenn
>>>
>>>
>>>> (this is why I said that todays Coders are simply special cases); this
>>>> should be very rare though, as users rarely should care how Beam encodes a
>>>> long or a double.
>>>>
>>>>>
>>>>> Offhand, Schemas seem to be an alternative to pipeline construction,
>>>>> rather than coders for value serialization, allowing manual field
>>>>> extraction code to be omitted. They do not appear to be a fundamental
>>>>> approach to achieve it. For example, the grouping operation still needs to
>>>>> encode the whole of the object as a value.
>>>>>
>>>>
>>>> Schemas are properties of the data - essentially a Schema is the data
>>>> type of a PCollection. In Java Schemas are also understood by ParDo, so you
>>>> can write a ParDo like this:
>>>>
>>>> @ProcessElement
>>>> public void process(@Field("user") String userId,  @Field("country")
>>>> String countryCode) {
>>>> }
>>>>
>>>> These extra functionalities are part of the graph, but they are enabled
>>>> by schemas.
>>>>
>>>>>
>>>>> As mentioned, I'm hoping to have a solution for existing coders by
>>>>> January's end, so waiting for your documentation doesn't work on that
>>>>> timeline.
>>>>>
>>>>
>>>> I don't think we need to wait for all the documentation to be written.
>>>>
>>>>
>>>>>
>>>>> That said, they aren't incompatible ideas as demonstrated by the Java
>>>>> implementation. The Go SDK remains in an experimental state. We can change
>>>>> things should the need arise in the next few months. Further, whenever Generics
>>>>> in Go
>>>>> <https://go.googlesource.com/proposal/+/master/design/go2draft-generics-overview.md>
>>>>> crop up, the existing user surface and execution stack will need to be
>>>>> re-written to take advantage of them anyway. That provides an opportunity
>>>>> to invert Coder vs Schema dependence while getting a nice performance
>>>>> boost, and cleaner code (and deleting much of my code generator).
>>>>>
>>>>> ----
>>>>>
>>>>> Were I to implement schemas to get the same syntatic benefits as the
>>>>> Java API, I'd be leveraging the field annotations Go has. This satisfies
>>>>> the protocol buffer issue as well, since generated go protos have name &
>>>>> json annotations. Schemas could be extracted that way. These are also
>>>>> available to anything using static analysis for more direct generation of
>>>>> accessors. The reflective approach would also work, which is excellent for
>>>>> development purposes.
>>>>>
>>>>> The rote code that the schemas were replacing would be able to be
>>>>> cobbled together into efficient DoFn and CombineFns for serialization. At
>>>>> present, it seems like it could be implemented as a side package that uses
>>>>> beam, rather than changing portions of the core beam Go packages, The real
>>>>> trick would be to do so without "apply" since that's not how the Go SDK is
>>>>> shaped.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <gl...@spotify.com> wrote:
>>>>>
>>>>>> Reuven, it sounds great. I see there is a similar thing to Row coders
>>>>>> happening in Apache Arrow <https://arrow.apache.org>, and there is a
>>>>>> similarity between Apache Arrow Flight
>>>>>> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
>>>>>> and data exchange service in portability. How do you see these two things
>>>>>> relate to each other in the long term?
>>>>>>
>>>>>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> The biggest advantage is actually readability and usability. A
>>>>>>> secondary advantage is that it means that Go will be able to interact
>>>>>>> seamlessly with BeamSQL, which would be a big win for Go.
>>>>>>>
>>>>>>> A schema is basically a way of saying that a record has a specific
>>>>>>> set of (possibly nested, possibly repeated) fields. So for instance let's
>>>>>>> say that the user's type is a struct with fields named user, country,
>>>>>>> purchaseCost. This allows us to provide transforms that operate on field
>>>>>>> names. Some example (using the Java API):
>>>>>>>
>>>>>>> PCollection users = events.apply(Select.fields("user"));  // Select
>>>>>>> out only the user field.
>>>>>>>
>>>>>>> PCollection joinedEvents =
>>>>>>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>>>>>>> PCollections by user.
>>>>>>>
>>>>>>> // For each country, calculate the total purchase cost as well as
>>>>>>> the top 10 purchases.
>>>>>>> // A new schema is created containing fields total_cost and
>>>>>>> top_purchases, and rows are created with the aggregation results.
>>>>>>> PCollection purchaseStatistics = events.apply(
>>>>>>>     Group.byFieldNames("country")
>>>>>>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>>>>>>> "total_cost"))
>>>>>>>                 .aggregateField("purchaseCost",
>>>>>>> Top.largestLongs(10), "top_purchases"))
>>>>>>>
>>>>>>>
>>>>>>> This is far more readable than what we have today, and what unlocks
>>>>>>> this is that Beam actually knows the structure of the record instead of
>>>>>>> assuming records are uncrackable blobs.
>>>>>>>
>>>>>>> Note that a coder is basically a special case of a schema that has a
>>>>>>> single field.
>>>>>>>
>>>>>>> In BeamJava we have a SchemaRegistry which knows how to turn user
>>>>>>> types into schemas. We use reflection to analyze many user types (e.g.
>>>>>>> simple POJO structs, JavaBean classes, Avro records, protocol buffers,
>>>>>>> etc.) to determine the schema, however this is done only when the graph is
>>>>>>> initially generated. We do use code generation (in Java we do bytecode
>>>>>>> generation) to make this somewhat more efficient. I'm willing to bet that
>>>>>>> the code generator you've written for structs could be very easily modified
>>>>>>> for schemas instead, so it would not be wasted work if we went with schemas.
>>>>>>>
>>>>>>> One of the things I'm working on now is documenting Beam schemas.
>>>>>>> They are already very powerful and useful, but since there is still nothing
>>>>>>> in our documentation about them, they are not yet widely used. I expect to
>>>>>>> finish draft documentation by the end of January.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> That's an interesting idea. I must confess I don't rightly know the
>>>>>>>> difference between a schema and coder, but here's what I've got with a bit
>>>>>>>> of searching through memory and the mailing list. Please let me know if I'm
>>>>>>>> off track.
>>>>>>>>
>>>>>>>> As near as I can tell, a schema, as far as Beam takes it
>>>>>>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
>>>>>>>> a mechanism to define what data is extracted from a given row of data. So
>>>>>>>> in principle, there's an opportunity to be more efficient with data with
>>>>>>>> many columns that aren't being used, and only extract the data that's
>>>>>>>> meaningful to the pipeline.
>>>>>>>> The trick then is how to apply the schema to a given serialization
>>>>>>>> format, which is something I'm missing in my mental model (and then how to
>>>>>>>> do it efficiently in Go).
>>>>>>>>
>>>>>>>> I do know that the Go client package for BigQuery
>>>>>>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>>>>>>>> something like that, using field tags. Similarly, the
>>>>>>>> "encoding/json" <https://golang.org/doc/articles/json_and_go.html> package
>>>>>>>> in the Go Standard Library permits annotating fields and it will read out
>>>>>>>> and deserialize the JSON fields and that's it.
>>>>>>>>
>>>>>>>> A concern I have is that Go (at present) would require pre-compile
>>>>>>>> time code generation for schemas to be efficient, and they would still
>>>>>>>> mostly boil down to turning []bytes into real structs. Go reflection
>>>>>>>> doesn't keep up.
>>>>>>>> Go has no mechanism I'm aware of to Just In Time compile more
>>>>>>>> efficient processing of values.
>>>>>>>> It's also not 100% clear how Schema's would play with protocol
>>>>>>>> buffers or similar.
>>>>>>>> BigQuery has a mechanism of generating a JSON schema from a proto
>>>>>>>> file <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>,
>>>>>>>> but that's only the specification half, not the using half.
>>>>>>>>
>>>>>>>> As it stands, the code generator I've been building these last
>>>>>>>> months could (in principle) statically analyze a user's struct, and then
>>>>>>>> generate an efficient dedicated coder for it. It just has no where to put
>>>>>>>> them such that the Go SDK would use it.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>> I'll make a different suggestion. There's been some chatter that
>>>>>>>>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>>>>>>>>> schemas the basic semantics instead of coders. Schemas provide everything a
>>>>>>>>> coder provides, but also allows for far more readable code. We can't make
>>>>>>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>>>>>>>>> we're better off starting with schemas instead of coders?
>>>>>>>>>
>>>>>>>>> Reuven
>>>>>>>>>
>>>>>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> One area that the Go SDK currently lacks: is the ability for
>>>>>>>>>> users to specify their own coders for types.
>>>>>>>>>>
>>>>>>>>>> I've written a proposal document,
>>>>>>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>>>>>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>>>>>>> that require discussion before getting on with the implementation.
>>>>>>>>>>
>>>>>>>>>> At presently, the SDK only permits primitive value types (all
>>>>>>>>>> numeric types but complex, strings, and []bytes) which are coded with beam
>>>>>>>>>> coders, and structs whose exported fields are of those type, which is then
>>>>>>>>>> encoded as JSON. Protocol buffer support is hacked in to avoid the type
>>>>>>>>>> anaiyzer, and presents the current work around this issue.
>>>>>>>>>>
>>>>>>>>>> The high level proposal is to catch up with Python and Java, and
>>>>>>>>>> have a coder registry. In addition, arrays, and maps should be permitted as
>>>>>>>>>> well.
>>>>>>>>>>
>>>>>>>>>> If you have alternatives, or other suggestions and opinions, I'd
>>>>>>>>>> love to hear them! Otherwise my intent is to get a PR ready by the end of
>>>>>>>>>> January.
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>> Robert Burke
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> http://go/where-is-rebo
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>> Gleb
>>>>>>
>>>>>

Re: [Go SDK] User Defined Coders

Posted by Robert Burke <ro...@frantil.com>.
Kenn has pointed out to me that Coders are not likely going to vanish in
the next  while, in particular over the FnAPI, so having a coder registry
does remain useful, as described by an early adopter in another thread.

On Fri, Jan 4, 2019, 10:51 AM Robert Burke <ro...@frantil.com> wrote:

> I think you're right Kenn.
>
> Reuven alluded to the difficulty in inference of what to use between
> AtomicType and the rest, in particular Struct<Schema>.
>
> Go has the additional concerns around Pointer vs Non Pointer types which
> isn't a concern either Python or Java have, but has implications on
> pipeline efficiency that need addressing, in particular, being able to use
> them in a useful fashion in the Go SDK.
>
> I agree that long term, having schemas as a default codec would be hugely
> beneficial for readability, composability, and allows more processing to be
> on the Runner Harness side of a worker. (I'll save the rest of my thoughts
> on Schemas in Go for the other thread, and say no more of it here.)
>
> *Regarding my proposal for User Defined Coders:*
>
> To avoid users accidentally preventing themselves from using Schemas in
> the future, I need to remove the ability to override the default coder *(4).
> *Then instead of JSON coding by default *(5)*, the SDK should be doing
> Schema coding. The SDK is already doing the recursive type analysis on
> types at pipeline construction time, so it's not a huge stretch to support
> Schemas using that information in the future, once Runner & FnAPI support
> begins to exist.
>
> *(1)* doesn't seem to need changing, as this is the existing AtomicType
> definition Kenn pointed out.
>
> *(2)* is the specific AtomicType override.
>
> *(3) *is the broader Go specific override for Go's unique interface
> semantics. This most of the cases *(4)* would have covered anyway, but in
> a targeted way.
>
> This should still allow Go users to better control their pipeline, and
> associated performance implications (which is my goal in this change),
> while not making an overall incompatible choice for powerful beam features
> for the common case in the future.
>
> Does that sound right?
>
> On Fri, 4 Jan 2019 at 10:05 Kenneth Knowles <ke...@apache.org> wrote:
>
>> On Thu, Jan 3, 2019 at 4:33 PM Reuven Lax <re...@google.com> wrote:
>>
>>> If a user wants custom encoding for a primitive type, they can create a
>>> byte-array field and wrap that field with a Coder
>>>
>>
>> This is the crux of the issue, right?
>>
>> Roughly, today, we've got:
>>
>>         Schema ::= [ (fieldname, Type) ]
>>
>>         Type ::= AtomicType | Array<Type> | Map<Type, Type> |
>> Struct<Schema>
>>
>>         AtomicType ::= bytes | int{16, 32, 64} | datetime | string | ...
>>
>> To fully replace custom encodings as they exist, you need:
>>
>>         AtomicType ::= bytes<CustomCoder> | ...
>>
>> At this point, an SDK need not surface the concept of "Coder" to a user
>> at all outside the bytes field concept and the wire encoding and efficient
>> should be identical or nearly to what we do with coders today. PCollections
>> in such an SDK have schemas, not coders, so we have successfully turned it
>> completely inside-out relative to how the Java SDK does it. Is that what
>> you have in mind?
>>
>> I really like this, but I agree with Robert that this is a major change
>> that takes a bunch of work and a lot more collaborative thinking in design
>> docs if we hope to get it right/stable.
>>
>> Kenn
>>
>>
>>> (this is why I said that todays Coders are simply special cases); this
>>> should be very rare though, as users rarely should care how Beam encodes a
>>> long or a double.
>>>
>>>>
>>>> Offhand, Schemas seem to be an alternative to pipeline construction,
>>>> rather than coders for value serialization, allowing manual field
>>>> extraction code to be omitted. They do not appear to be a fundamental
>>>> approach to achieve it. For example, the grouping operation still needs to
>>>> encode the whole of the object as a value.
>>>>
>>>
>>> Schemas are properties of the data - essentially a Schema is the data
>>> type of a PCollection. In Java Schemas are also understood by ParDo, so you
>>> can write a ParDo like this:
>>>
>>> @ProcessElement
>>> public void process(@Field("user") String userId,  @Field("country")
>>> String countryCode) {
>>> }
>>>
>>> These extra functionalities are part of the graph, but they are enabled
>>> by schemas.
>>>
>>>>
>>>> As mentioned, I'm hoping to have a solution for existing coders by
>>>> January's end, so waiting for your documentation doesn't work on that
>>>> timeline.
>>>>
>>>
>>> I don't think we need to wait for all the documentation to be written.
>>>
>>>
>>>>
>>>> That said, they aren't incompatible ideas as demonstrated by the Java
>>>> implementation. The Go SDK remains in an experimental state. We can change
>>>> things should the need arise in the next few months. Further, whenever Generics
>>>> in Go
>>>> <https://go.googlesource.com/proposal/+/master/design/go2draft-generics-overview.md>
>>>> crop up, the existing user surface and execution stack will need to be
>>>> re-written to take advantage of them anyway. That provides an opportunity
>>>> to invert Coder vs Schema dependence while getting a nice performance
>>>> boost, and cleaner code (and deleting much of my code generator).
>>>>
>>>> ----
>>>>
>>>> Were I to implement schemas to get the same syntatic benefits as the
>>>> Java API, I'd be leveraging the field annotations Go has. This satisfies
>>>> the protocol buffer issue as well, since generated go protos have name &
>>>> json annotations. Schemas could be extracted that way. These are also
>>>> available to anything using static analysis for more direct generation of
>>>> accessors. The reflective approach would also work, which is excellent for
>>>> development purposes.
>>>>
>>>> The rote code that the schemas were replacing would be able to be
>>>> cobbled together into efficient DoFn and CombineFns for serialization. At
>>>> present, it seems like it could be implemented as a side package that uses
>>>> beam, rather than changing portions of the core beam Go packages, The real
>>>> trick would be to do so without "apply" since that's not how the Go SDK is
>>>> shaped.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <gl...@spotify.com> wrote:
>>>>
>>>>> Reuven, it sounds great. I see there is a similar thing to Row coders
>>>>> happening in Apache Arrow <https://arrow.apache.org>, and there is a
>>>>> similarity between Apache Arrow Flight
>>>>> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
>>>>> and data exchange service in portability. How do you see these two things
>>>>> relate to each other in the long term?
>>>>>
>>>>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> The biggest advantage is actually readability and usability. A
>>>>>> secondary advantage is that it means that Go will be able to interact
>>>>>> seamlessly with BeamSQL, which would be a big win for Go.
>>>>>>
>>>>>> A schema is basically a way of saying that a record has a specific
>>>>>> set of (possibly nested, possibly repeated) fields. So for instance let's
>>>>>> say that the user's type is a struct with fields named user, country,
>>>>>> purchaseCost. This allows us to provide transforms that operate on field
>>>>>> names. Some example (using the Java API):
>>>>>>
>>>>>> PCollection users = events.apply(Select.fields("user"));  // Select
>>>>>> out only the user field.
>>>>>>
>>>>>> PCollection joinedEvents =
>>>>>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>>>>>> PCollections by user.
>>>>>>
>>>>>> // For each country, calculate the total purchase cost as well as the
>>>>>> top 10 purchases.
>>>>>> // A new schema is created containing fields total_cost and
>>>>>> top_purchases, and rows are created with the aggregation results.
>>>>>> PCollection purchaseStatistics = events.apply(
>>>>>>     Group.byFieldNames("country")
>>>>>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>>>>>> "total_cost"))
>>>>>>                 .aggregateField("purchaseCost", Top.largestLongs(10),
>>>>>> "top_purchases"))
>>>>>>
>>>>>>
>>>>>> This is far more readable than what we have today, and what unlocks
>>>>>> this is that Beam actually knows the structure of the record instead of
>>>>>> assuming records are uncrackable blobs.
>>>>>>
>>>>>> Note that a coder is basically a special case of a schema that has a
>>>>>> single field.
>>>>>>
>>>>>> In BeamJava we have a SchemaRegistry which knows how to turn user
>>>>>> types into schemas. We use reflection to analyze many user types (e.g.
>>>>>> simple POJO structs, JavaBean classes, Avro records, protocol buffers,
>>>>>> etc.) to determine the schema, however this is done only when the graph is
>>>>>> initially generated. We do use code generation (in Java we do bytecode
>>>>>> generation) to make this somewhat more efficient. I'm willing to bet that
>>>>>> the code generator you've written for structs could be very easily modified
>>>>>> for schemas instead, so it would not be wasted work if we went with schemas.
>>>>>>
>>>>>> One of the things I'm working on now is documenting Beam schemas.
>>>>>> They are already very powerful and useful, but since there is still nothing
>>>>>> in our documentation about them, they are not yet widely used. I expect to
>>>>>> finish draft documentation by the end of January.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>>>>>>
>>>>>>> That's an interesting idea. I must confess I don't rightly know the
>>>>>>> difference between a schema and coder, but here's what I've got with a bit
>>>>>>> of searching through memory and the mailing list. Please let me know if I'm
>>>>>>> off track.
>>>>>>>
>>>>>>> As near as I can tell, a schema, as far as Beam takes it
>>>>>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
>>>>>>> a mechanism to define what data is extracted from a given row of data. So
>>>>>>> in principle, there's an opportunity to be more efficient with data with
>>>>>>> many columns that aren't being used, and only extract the data that's
>>>>>>> meaningful to the pipeline.
>>>>>>> The trick then is how to apply the schema to a given serialization
>>>>>>> format, which is something I'm missing in my mental model (and then how to
>>>>>>> do it efficiently in Go).
>>>>>>>
>>>>>>> I do know that the Go client package for BigQuery
>>>>>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>>>>>>> something like that, using field tags. Similarly, the
>>>>>>> "encoding/json" <https://golang.org/doc/articles/json_and_go.html> package
>>>>>>> in the Go Standard Library permits annotating fields and it will read out
>>>>>>> and deserialize the JSON fields and that's it.
>>>>>>>
>>>>>>> A concern I have is that Go (at present) would require pre-compile
>>>>>>> time code generation for schemas to be efficient, and they would still
>>>>>>> mostly boil down to turning []bytes into real structs. Go reflection
>>>>>>> doesn't keep up.
>>>>>>> Go has no mechanism I'm aware of to Just In Time compile more
>>>>>>> efficient processing of values.
>>>>>>> It's also not 100% clear how Schema's would play with protocol
>>>>>>> buffers or similar.
>>>>>>> BigQuery has a mechanism of generating a JSON schema from a proto
>>>>>>> file <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>,
>>>>>>> but that's only the specification half, not the using half.
>>>>>>>
>>>>>>> As it stands, the code generator I've been building these last
>>>>>>> months could (in principle) statically analyze a user's struct, and then
>>>>>>> generate an efficient dedicated coder for it. It just has no where to put
>>>>>>> them such that the Go SDK would use it.
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> I'll make a different suggestion. There's been some chatter that
>>>>>>>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>>>>>>>> schemas the basic semantics instead of coders. Schemas provide everything a
>>>>>>>> coder provides, but also allows for far more readable code. We can't make
>>>>>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>>>>>>>> we're better off starting with schemas instead of coders?
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> One area that the Go SDK currently lacks: is the ability for users
>>>>>>>>> to specify their own coders for types.
>>>>>>>>>
>>>>>>>>> I've written a proposal document,
>>>>>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>>>>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>>>>>> that require discussion before getting on with the implementation.
>>>>>>>>>
>>>>>>>>> At presently, the SDK only permits primitive value types (all
>>>>>>>>> numeric types but complex, strings, and []bytes) which are coded with beam
>>>>>>>>> coders, and structs whose exported fields are of those type, which is then
>>>>>>>>> encoded as JSON. Protocol buffer support is hacked in to avoid the type
>>>>>>>>> anaiyzer, and presents the current work around this issue.
>>>>>>>>>
>>>>>>>>> The high level proposal is to catch up with Python and Java, and
>>>>>>>>> have a coder registry. In addition, arrays, and maps should be permitted as
>>>>>>>>> well.
>>>>>>>>>
>>>>>>>>> If you have alternatives, or other suggestions and opinions, I'd
>>>>>>>>> love to hear them! Otherwise my intent is to get a PR ready by the end of
>>>>>>>>> January.
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> Robert Burke
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> http://go/where-is-rebo
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Cheers,
>>>>> Gleb
>>>>>
>>>>

Re: [Go SDK] User Defined Coders

Posted by Robert Burke <ro...@frantil.com>.
I think you're right Kenn.

Reuven alluded to the difficulty in inference of what to use between
AtomicType and the rest, in particular Struct<Schema>.

Go has the additional concerns around Pointer vs Non Pointer types which
isn't a concern either Python or Java have, but has implications on
pipeline efficiency that need addressing, in particular, being able to use
them in a useful fashion in the Go SDK.

I agree that long term, having schemas as a default codec would be hugely
beneficial for readability, composability, and allows more processing to be
on the Runner Harness side of a worker. (I'll save the rest of my thoughts
on Schemas in Go for the other thread, and say no more of it here.)

*Regarding my proposal for User Defined Coders:*

To avoid users accidentally preventing themselves from using Schemas in the
future, I need to remove the ability to override the default coder *(4). *Then
instead of JSON coding by default *(5)*, the SDK should be doing Schema
coding. The SDK is already doing the recursive type analysis on types at
pipeline construction time, so it's not a huge stretch to support Schemas
using that information in the future, once Runner & FnAPI support begins to
exist.

*(1)* doesn't seem to need changing, as this is the existing AtomicType
definition Kenn pointed out.

*(2)* is the specific AtomicType override.

*(3) *is the broader Go specific override for Go's unique interface
semantics. This most of the cases *(4)* would have covered anyway, but in a
targeted way.

This should still allow Go users to better control their pipeline, and
associated performance implications (which is my goal in this change),
while not making an overall incompatible choice for powerful beam features
for the common case in the future.

Does that sound right?

On Fri, 4 Jan 2019 at 10:05 Kenneth Knowles <ke...@apache.org> wrote:

> On Thu, Jan 3, 2019 at 4:33 PM Reuven Lax <re...@google.com> wrote:
>
>> If a user wants custom encoding for a primitive type, they can create a
>> byte-array field and wrap that field with a Coder
>>
>
> This is the crux of the issue, right?
>
> Roughly, today, we've got:
>
>         Schema ::= [ (fieldname, Type) ]
>
>         Type ::= AtomicType | Array<Type> | Map<Type, Type> |
> Struct<Schema>
>
>         AtomicType ::= bytes | int{16, 32, 64} | datetime | string | ...
>
> To fully replace custom encodings as they exist, you need:
>
>         AtomicType ::= bytes<CustomCoder> | ...
>
> At this point, an SDK need not surface the concept of "Coder" to a user at
> all outside the bytes field concept and the wire encoding and efficient
> should be identical or nearly to what we do with coders today. PCollections
> in such an SDK have schemas, not coders, so we have successfully turned it
> completely inside-out relative to how the Java SDK does it. Is that what
> you have in mind?
>
> I really like this, but I agree with Robert that this is a major change
> that takes a bunch of work and a lot more collaborative thinking in design
> docs if we hope to get it right/stable.
>
> Kenn
>
>
>> (this is why I said that todays Coders are simply special cases); this
>> should be very rare though, as users rarely should care how Beam encodes a
>> long or a double.
>>
>>>
>>> Offhand, Schemas seem to be an alternative to pipeline construction,
>>> rather than coders for value serialization, allowing manual field
>>> extraction code to be omitted. They do not appear to be a fundamental
>>> approach to achieve it. For example, the grouping operation still needs to
>>> encode the whole of the object as a value.
>>>
>>
>> Schemas are properties of the data - essentially a Schema is the data
>> type of a PCollection. In Java Schemas are also understood by ParDo, so you
>> can write a ParDo like this:
>>
>> @ProcessElement
>> public void process(@Field("user") String userId,  @Field("country")
>> String countryCode) {
>> }
>>
>> These extra functionalities are part of the graph, but they are enabled
>> by schemas.
>>
>>>
>>> As mentioned, I'm hoping to have a solution for existing coders by
>>> January's end, so waiting for your documentation doesn't work on that
>>> timeline.
>>>
>>
>> I don't think we need to wait for all the documentation to be written.
>>
>>
>>>
>>> That said, they aren't incompatible ideas as demonstrated by the Java
>>> implementation. The Go SDK remains in an experimental state. We can change
>>> things should the need arise in the next few months. Further, whenever Generics
>>> in Go
>>> <https://go.googlesource.com/proposal/+/master/design/go2draft-generics-overview.md>
>>> crop up, the existing user surface and execution stack will need to be
>>> re-written to take advantage of them anyway. That provides an opportunity
>>> to invert Coder vs Schema dependence while getting a nice performance
>>> boost, and cleaner code (and deleting much of my code generator).
>>>
>>> ----
>>>
>>> Were I to implement schemas to get the same syntatic benefits as the
>>> Java API, I'd be leveraging the field annotations Go has. This satisfies
>>> the protocol buffer issue as well, since generated go protos have name &
>>> json annotations. Schemas could be extracted that way. These are also
>>> available to anything using static analysis for more direct generation of
>>> accessors. The reflective approach would also work, which is excellent for
>>> development purposes.
>>>
>>> The rote code that the schemas were replacing would be able to be
>>> cobbled together into efficient DoFn and CombineFns for serialization. At
>>> present, it seems like it could be implemented as a side package that uses
>>> beam, rather than changing portions of the core beam Go packages, The real
>>> trick would be to do so without "apply" since that's not how the Go SDK is
>>> shaped.
>>>
>>>
>>>
>>>
>>> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <gl...@spotify.com> wrote:
>>>
>>>> Reuven, it sounds great. I see there is a similar thing to Row coders
>>>> happening in Apache Arrow <https://arrow.apache.org>, and there is a
>>>> similarity between Apache Arrow Flight
>>>> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
>>>> and data exchange service in portability. How do you see these two things
>>>> relate to each other in the long term?
>>>>
>>>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> The biggest advantage is actually readability and usability. A
>>>>> secondary advantage is that it means that Go will be able to interact
>>>>> seamlessly with BeamSQL, which would be a big win for Go.
>>>>>
>>>>> A schema is basically a way of saying that a record has a specific set
>>>>> of (possibly nested, possibly repeated) fields. So for instance let's say
>>>>> that the user's type is a struct with fields named user, country,
>>>>> purchaseCost. This allows us to provide transforms that operate on field
>>>>> names. Some example (using the Java API):
>>>>>
>>>>> PCollection users = events.apply(Select.fields("user"));  // Select
>>>>> out only the user field.
>>>>>
>>>>> PCollection joinedEvents =
>>>>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>>>>> PCollections by user.
>>>>>
>>>>> // For each country, calculate the total purchase cost as well as the
>>>>> top 10 purchases.
>>>>> // A new schema is created containing fields total_cost and
>>>>> top_purchases, and rows are created with the aggregation results.
>>>>> PCollection purchaseStatistics = events.apply(
>>>>>     Group.byFieldNames("country")
>>>>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>>>>> "total_cost"))
>>>>>                 .aggregateField("purchaseCost", Top.largestLongs(10),
>>>>> "top_purchases"))
>>>>>
>>>>>
>>>>> This is far more readable than what we have today, and what unlocks
>>>>> this is that Beam actually knows the structure of the record instead of
>>>>> assuming records are uncrackable blobs.
>>>>>
>>>>> Note that a coder is basically a special case of a schema that has a
>>>>> single field.
>>>>>
>>>>> In BeamJava we have a SchemaRegistry which knows how to turn user
>>>>> types into schemas. We use reflection to analyze many user types (e.g.
>>>>> simple POJO structs, JavaBean classes, Avro records, protocol buffers,
>>>>> etc.) to determine the schema, however this is done only when the graph is
>>>>> initially generated. We do use code generation (in Java we do bytecode
>>>>> generation) to make this somewhat more efficient. I'm willing to bet that
>>>>> the code generator you've written for structs could be very easily modified
>>>>> for schemas instead, so it would not be wasted work if we went with schemas.
>>>>>
>>>>> One of the things I'm working on now is documenting Beam schemas. They
>>>>> are already very powerful and useful, but since there is still nothing in
>>>>> our documentation about them, they are not yet widely used. I expect to
>>>>> finish draft documentation by the end of January.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>>>>>
>>>>>> That's an interesting idea. I must confess I don't rightly know the
>>>>>> difference between a schema and coder, but here's what I've got with a bit
>>>>>> of searching through memory and the mailing list. Please let me know if I'm
>>>>>> off track.
>>>>>>
>>>>>> As near as I can tell, a schema, as far as Beam takes it
>>>>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
>>>>>> a mechanism to define what data is extracted from a given row of data. So
>>>>>> in principle, there's an opportunity to be more efficient with data with
>>>>>> many columns that aren't being used, and only extract the data that's
>>>>>> meaningful to the pipeline.
>>>>>> The trick then is how to apply the schema to a given serialization
>>>>>> format, which is something I'm missing in my mental model (and then how to
>>>>>> do it efficiently in Go).
>>>>>>
>>>>>> I do know that the Go client package for BigQuery
>>>>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>>>>>> something like that, using field tags. Similarly, the "encoding/json"
>>>>>> <https://golang.org/doc/articles/json_and_go.html> package in the Go
>>>>>> Standard Library permits annotating fields and it will read out and
>>>>>> deserialize the JSON fields and that's it.
>>>>>>
>>>>>> A concern I have is that Go (at present) would require pre-compile
>>>>>> time code generation for schemas to be efficient, and they would still
>>>>>> mostly boil down to turning []bytes into real structs. Go reflection
>>>>>> doesn't keep up.
>>>>>> Go has no mechanism I'm aware of to Just In Time compile more
>>>>>> efficient processing of values.
>>>>>> It's also not 100% clear how Schema's would play with protocol
>>>>>> buffers or similar.
>>>>>> BigQuery has a mechanism of generating a JSON schema from a proto file
>>>>>> <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, but
>>>>>> that's only the specification half, not the using half.
>>>>>>
>>>>>> As it stands, the code generator I've been building these last months
>>>>>> could (in principle) statically analyze a user's struct, and then generate
>>>>>> an efficient dedicated coder for it. It just has no where to put them such
>>>>>> that the Go SDK would use it.
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> I'll make a different suggestion. There's been some chatter that
>>>>>>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>>>>>>> schemas the basic semantics instead of coders. Schemas provide everything a
>>>>>>> coder provides, but also allows for far more readable code. We can't make
>>>>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>>>>>>> we're better off starting with schemas instead of coders?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> One area that the Go SDK currently lacks: is the ability for users
>>>>>>>> to specify their own coders for types.
>>>>>>>>
>>>>>>>> I've written a proposal document,
>>>>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>>>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>>>>> that require discussion before getting on with the implementation.
>>>>>>>>
>>>>>>>> At presently, the SDK only permits primitive value types (all
>>>>>>>> numeric types but complex, strings, and []bytes) which are coded with beam
>>>>>>>> coders, and structs whose exported fields are of those type, which is then
>>>>>>>> encoded as JSON. Protocol buffer support is hacked in to avoid the type
>>>>>>>> anaiyzer, and presents the current work around this issue.
>>>>>>>>
>>>>>>>> The high level proposal is to catch up with Python and Java, and
>>>>>>>> have a coder registry. In addition, arrays, and maps should be permitted as
>>>>>>>> well.
>>>>>>>>
>>>>>>>> If you have alternatives, or other suggestions and opinions, I'd
>>>>>>>> love to hear them! Otherwise my intent is to get a PR ready by the end of
>>>>>>>> January.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> Robert Burke
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> http://go/where-is-rebo
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> Gleb
>>>>
>>>

Re: [Go SDK] User Defined Coders

Posted by Robert Bradshaw <ro...@google.com>.
On Fri, Jan 4, 2019 at 7:05 PM Kenneth Knowles <ke...@apache.org> wrote:
>
> On Thu, Jan 3, 2019 at 4:33 PM Reuven Lax <re...@google.com> wrote:
>>
>> If a user wants custom encoding for a primitive type, they can create a byte-array field and wrap that field with a Coder

I don't think the primary use of coders is a custom encoding for
primitive types, rather it's to provide any encoding for a custom
type. This is what schemas are lacking now.

On the other hand, using coders at the Runner level gives the runner
flexibility in how it wants even well-known types (e.g. windowed
types, lazy iterables) to be encoded for easy manipulation.

> This is the crux of the issue, right?
>
> Roughly, today, we've got:
>
>         Schema ::= [ (fieldname, Type) ]
>
>         Type ::= AtomicType | Array<Type> | Map<Type, Type> | Struct<Schema>
>
>         AtomicType ::= bytes | int{16, 32, 64} | datetime | string | ...

This is starting to look a lot like a re-implementation of protocol
buffers... Perhaps the advantage is that it's more lightweight?

If you squint, it also looks isomorphic to Type == WellKnownCoder.
Which, if we allow extensibility, becomes Type == Coder. (Possibly we
could simplify the nested/unnested/deterministic logic by constraining
unknown coders though.)

> To fully replace custom encodings as they exist, you need:
>
>         AtomicType ::= bytes<CustomCoder> | ...
>
> At this point, an SDK need not surface the concept of "Coder" to a user at all outside the bytes field concept and the wire encoding and efficient should be identical or nearly to what we do with coders today. PCollections in such an SDK have schemas, not coders, so we have successfully turned it completely inside-out relative to how the Java SDK does it. Is that what you have in mind?

If a schema had a Type parameterizable by an encode/decode method
(a.k.a. a Coder) we could do this inversion. The primary difference is
that one would now assert that all PCollections have a composite
record structure, even if they only contain one field. (Under what
conditions would we flatten [ fieldname: SchemaX ] to SchemaX?)

> I really like this, but I agree with Robert that this is a major change that takes a bunch of work and a lot more collaborative thinking in design docs if we hope to get it right/stable.
>
> Kenn
>
>>
>> (this is why I said that todays Coders are simply special cases); this should be very rare though, as users rarely should care how Beam encodes a long or a double.
>>>
>>>
>>> Offhand, Schemas seem to be an alternative to pipeline construction, rather than coders for value serialization, allowing manual field extraction code to be omitted. They do not appear to be a fundamental approach to achieve it. For example, the grouping operation still needs to encode the whole of the object as a value.
>>
>>
>> Schemas are properties of the data - essentially a Schema is the data type of a PCollection. In Java Schemas are also understood by ParDo, so you can write a ParDo like this:
>>
>> @ProcessElement
>> public void process(@Field("user") String userId,  @Field("country") String countryCode) {
>> }
>>
>> These extra functionalities are part of the graph, but they are enabled by schemas.
>>>
>>>
>>> As mentioned, I'm hoping to have a solution for existing coders by January's end, so waiting for your documentation doesn't work on that timeline.
>>
>>
>> I don't think we need to wait for all the documentation to be written.
>>
>>>
>>>
>>> That said, they aren't incompatible ideas as demonstrated by the Java implementation. The Go SDK remains in an experimental state. We can change things should the need arise in the next few months. Further, whenever Generics in Go crop up, the existing user surface and execution stack will need to be re-written to take advantage of them anyway. That provides an opportunity to invert Coder vs Schema dependence while getting a nice performance boost, and cleaner code (and deleting much of my code generator).
>>>
>>> ----
>>>
>>> Were I to implement schemas to get the same syntatic benefits as the Java API, I'd be leveraging the field annotations Go has. This satisfies the protocol buffer issue as well, since generated go protos have name & json annotations. Schemas could be extracted that way. These are also available to anything using static analysis for more direct generation of accessors. The reflective approach would also work, which is excellent for development purposes.
>>>
>>> The rote code that the schemas were replacing would be able to be cobbled together into efficient DoFn and CombineFns for serialization. At present, it seems like it could be implemented as a side package that uses beam, rather than changing portions of the core beam Go packages, The real trick would be to do so without "apply" since that's not how the Go SDK is shaped.
>>>
>>>
>>>
>>>
>>> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <gl...@spotify.com> wrote:
>>>>
>>>> Reuven, it sounds great. I see there is a similar thing to Row coders happening in Apache Arrow, and there is a similarity between Apache Arrow Flight and data exchange service in portability. How do you see these two things relate to each other in the long term?
>>>>
>>>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>> The biggest advantage is actually readability and usability. A secondary advantage is that it means that Go will be able to interact seamlessly with BeamSQL, which would be a big win for Go.
>>>>>
>>>>> A schema is basically a way of saying that a record has a specific set of (possibly nested, possibly repeated) fields. So for instance let's say that the user's type is a struct with fields named user, country, purchaseCost. This allows us to provide transforms that operate on field names. Some example (using the Java API):
>>>>>
>>>>> PCollection users = events.apply(Select.fields("user"));  // Select out only the user field.
>>>>>
>>>>> PCollection joinedEvents = queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two PCollections by user.
>>>>>
>>>>> // For each country, calculate the total purchase cost as well as the top 10 purchases.
>>>>> // A new schema is created containing fields total_cost and top_purchases, and rows are created with the aggregation results.
>>>>> PCollection purchaseStatistics = events.apply(
>>>>>     Group.byFieldNames("country")
>>>>>                .aggregateField("purchaseCost", Sum.ofLongs(), "total_cost"))
>>>>>                 .aggregateField("purchaseCost", Top.largestLongs(10), "top_purchases"))
>>>>>
>>>>>
>>>>> This is far more readable than what we have today, and what unlocks this is that Beam actually knows the structure of the record instead of assuming records are uncrackable blobs.
>>>>>
>>>>> Note that a coder is basically a special case of a schema that has a single field.
>>>>>
>>>>> In BeamJava we have a SchemaRegistry which knows how to turn user types into schemas. We use reflection to analyze many user types (e.g. simple POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to determine the schema, however this is done only when the graph is initially generated. We do use code generation (in Java we do bytecode generation) to make this somewhat more efficient. I'm willing to bet that the code generator you've written for structs could be very easily modified for schemas instead, so it would not be wasted work if we went with schemas.
>>>>>
>>>>> One of the things I'm working on now is documenting Beam schemas. They are already very powerful and useful, but since there is still nothing in our documentation about them, they are not yet widely used. I expect to finish draft documentation by the end of January.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>>>>>>
>>>>>> That's an interesting idea. I must confess I don't rightly know the difference between a schema and coder, but here's what I've got with a bit of searching through memory and the mailing list. Please let me know if I'm off track.
>>>>>>
>>>>>> As near as I can tell, a schema, as far as Beam takes it is a mechanism to define what data is extracted from a given row of data. So in principle, there's an opportunity to be more efficient with data with many columns that aren't being used, and only extract the data that's meaningful to the pipeline.
>>>>>> The trick then is how to apply the schema to a given serialization format, which is something I'm missing in my mental model (and then how to do it efficiently in Go).
>>>>>>
>>>>>> I do know that the Go client package for BigQuery does something like that, using field tags. Similarly, the "encoding/json" package in the Go Standard Library permits annotating fields and it will read out and deserialize the JSON fields and that's it.
>>>>>>
>>>>>> A concern I have is that Go (at present) would require pre-compile time code generation for schemas to be efficient, and they would still mostly boil down to turning []bytes into real structs. Go reflection doesn't keep up.
>>>>>> Go has no mechanism I'm aware of to Just In Time compile more efficient processing of values.
>>>>>> It's also not 100% clear how Schema's would play with protocol buffers or similar.
>>>>>> BigQuery has a mechanism of generating a JSON schema from a proto file, but that's only the specification half, not the using half.
>>>>>>
>>>>>> As it stands, the code generator I've been building these last months could (in principle) statically analyze a user's struct, and then generate an efficient dedicated coder for it. It just has no where to put them such that the Go SDK would use it.
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>> I'll make a different suggestion. There's been some chatter that schemas are a better tool than coders, and that in Beam 3.0 we should make schemas the basic semantics instead of coders. Schemas provide everything a coder provides, but also allows for far more readable code. We can't make such a change in Beam Java 2.X for compatibility reasons, but maybe in Go we're better off starting with schemas instead of coders?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com> wrote:
>>>>>>>>
>>>>>>>> One area that the Go SDK currently lacks: is the ability for users to specify their own coders for types.
>>>>>>>>
>>>>>>>> I've written a proposal document, and while I'm confident about the core, there are certainly some edge cases that require discussion before getting on with the implementation.
>>>>>>>>
>>>>>>>> At presently, the SDK only permits primitive value types (all numeric types but complex, strings, and []bytes) which are coded with beam coders, and structs whose exported fields are of those type, which is then encoded as JSON. Protocol buffer support is hacked in to avoid the type anaiyzer, and presents the current work around this issue.
>>>>>>>>
>>>>>>>> The high level proposal is to catch up with Python and Java, and have a coder registry. In addition, arrays, and maps should be permitted as well.
>>>>>>>>
>>>>>>>> If you have alternatives, or other suggestions and opinions, I'd love to hear them! Otherwise my intent is to get a PR ready by the end of January.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> Robert Burke
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> http://go/where-is-rebo
>>>>
>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> Gleb

Re: [Go SDK] User Defined Coders

Posted by Kenneth Knowles <ke...@apache.org>.
On Thu, Jan 3, 2019 at 4:33 PM Reuven Lax <re...@google.com> wrote:

> If a user wants custom encoding for a primitive type, they can create a
> byte-array field and wrap that field with a Coder
>

This is the crux of the issue, right?

Roughly, today, we've got:

        Schema ::= [ (fieldname, Type) ]

        Type ::= AtomicType | Array<Type> | Map<Type, Type> | Struct<Schema>

        AtomicType ::= bytes | int{16, 32, 64} | datetime | string | ...

To fully replace custom encodings as they exist, you need:

        AtomicType ::= bytes<CustomCoder> | ...

At this point, an SDK need not surface the concept of "Coder" to a user at
all outside the bytes field concept and the wire encoding and efficient
should be identical or nearly to what we do with coders today. PCollections
in such an SDK have schemas, not coders, so we have successfully turned it
completely inside-out relative to how the Java SDK does it. Is that what
you have in mind?

I really like this, but I agree with Robert that this is a major change
that takes a bunch of work and a lot more collaborative thinking in design
docs if we hope to get it right/stable.

Kenn


> (this is why I said that todays Coders are simply special cases); this
> should be very rare though, as users rarely should care how Beam encodes a
> long or a double.
>
>>
>> Offhand, Schemas seem to be an alternative to pipeline construction,
>> rather than coders for value serialization, allowing manual field
>> extraction code to be omitted. They do not appear to be a fundamental
>> approach to achieve it. For example, the grouping operation still needs to
>> encode the whole of the object as a value.
>>
>
> Schemas are properties of the data - essentially a Schema is the data type
> of a PCollection. In Java Schemas are also understood by ParDo, so you can
> write a ParDo like this:
>
> @ProcessElement
> public void process(@Field("user") String userId,  @Field("country")
> String countryCode) {
> }
>
> These extra functionalities are part of the graph, but they are enabled by
> schemas.
>
>>
>> As mentioned, I'm hoping to have a solution for existing coders by
>> January's end, so waiting for your documentation doesn't work on that
>> timeline.
>>
>
> I don't think we need to wait for all the documentation to be written.
>
>
>>
>> That said, they aren't incompatible ideas as demonstrated by the Java
>> implementation. The Go SDK remains in an experimental state. We can change
>> things should the need arise in the next few months. Further, whenever Generics
>> in Go
>> <https://go.googlesource.com/proposal/+/master/design/go2draft-generics-overview.md>
>> crop up, the existing user surface and execution stack will need to be
>> re-written to take advantage of them anyway. That provides an opportunity
>> to invert Coder vs Schema dependence while getting a nice performance
>> boost, and cleaner code (and deleting much of my code generator).
>>
>> ----
>>
>> Were I to implement schemas to get the same syntatic benefits as the Java
>> API, I'd be leveraging the field annotations Go has. This satisfies the
>> protocol buffer issue as well, since generated go protos have name & json
>> annotations. Schemas could be extracted that way. These are also available
>> to anything using static analysis for more direct generation of accessors.
>> The reflective approach would also work, which is excellent for development
>> purposes.
>>
>> The rote code that the schemas were replacing would be able to be cobbled
>> together into efficient DoFn and CombineFns for serialization. At present,
>> it seems like it could be implemented as a side package that uses beam,
>> rather than changing portions of the core beam Go packages, The real trick
>> would be to do so without "apply" since that's not how the Go SDK is shaped.
>>
>>
>>
>>
>> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <gl...@spotify.com> wrote:
>>
>>> Reuven, it sounds great. I see there is a similar thing to Row coders
>>> happening in Apache Arrow <https://arrow.apache.org>, and there is a
>>> similarity between Apache Arrow Flight
>>> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
>>> and data exchange service in portability. How do you see these two things
>>> relate to each other in the long term?
>>>
>>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> The biggest advantage is actually readability and usability. A
>>>> secondary advantage is that it means that Go will be able to interact
>>>> seamlessly with BeamSQL, which would be a big win for Go.
>>>>
>>>> A schema is basically a way of saying that a record has a specific set
>>>> of (possibly nested, possibly repeated) fields. So for instance let's say
>>>> that the user's type is a struct with fields named user, country,
>>>> purchaseCost. This allows us to provide transforms that operate on field
>>>> names. Some example (using the Java API):
>>>>
>>>> PCollection users = events.apply(Select.fields("user"));  // Select out
>>>> only the user field.
>>>>
>>>> PCollection joinedEvents =
>>>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>>>> PCollections by user.
>>>>
>>>> // For each country, calculate the total purchase cost as well as the
>>>> top 10 purchases.
>>>> // A new schema is created containing fields total_cost and
>>>> top_purchases, and rows are created with the aggregation results.
>>>> PCollection purchaseStatistics = events.apply(
>>>>     Group.byFieldNames("country")
>>>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>>>> "total_cost"))
>>>>                 .aggregateField("purchaseCost", Top.largestLongs(10),
>>>> "top_purchases"))
>>>>
>>>>
>>>> This is far more readable than what we have today, and what unlocks
>>>> this is that Beam actually knows the structure of the record instead of
>>>> assuming records are uncrackable blobs.
>>>>
>>>> Note that a coder is basically a special case of a schema that has a
>>>> single field.
>>>>
>>>> In BeamJava we have a SchemaRegistry which knows how to turn user types
>>>> into schemas. We use reflection to analyze many user types (e.g. simple
>>>> POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to
>>>> determine the schema, however this is done only when the graph is initially
>>>> generated. We do use code generation (in Java we do bytecode generation) to
>>>> make this somewhat more efficient. I'm willing to bet that the code
>>>> generator you've written for structs could be very easily modified for
>>>> schemas instead, so it would not be wasted work if we went with schemas.
>>>>
>>>> One of the things I'm working on now is documenting Beam schemas. They
>>>> are already very powerful and useful, but since there is still nothing in
>>>> our documentation about them, they are not yet widely used. I expect to
>>>> finish draft documentation by the end of January.
>>>>
>>>> Reuven
>>>>
>>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>>>>
>>>>> That's an interesting idea. I must confess I don't rightly know the
>>>>> difference between a schema and coder, but here's what I've got with a bit
>>>>> of searching through memory and the mailing list. Please let me know if I'm
>>>>> off track.
>>>>>
>>>>> As near as I can tell, a schema, as far as Beam takes it
>>>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
>>>>> a mechanism to define what data is extracted from a given row of data. So
>>>>> in principle, there's an opportunity to be more efficient with data with
>>>>> many columns that aren't being used, and only extract the data that's
>>>>> meaningful to the pipeline.
>>>>> The trick then is how to apply the schema to a given serialization
>>>>> format, which is something I'm missing in my mental model (and then how to
>>>>> do it efficiently in Go).
>>>>>
>>>>> I do know that the Go client package for BigQuery
>>>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>>>>> something like that, using field tags. Similarly, the "encoding/json"
>>>>> <https://golang.org/doc/articles/json_and_go.html> package in the Go
>>>>> Standard Library permits annotating fields and it will read out and
>>>>> deserialize the JSON fields and that's it.
>>>>>
>>>>> A concern I have is that Go (at present) would require pre-compile
>>>>> time code generation for schemas to be efficient, and they would still
>>>>> mostly boil down to turning []bytes into real structs. Go reflection
>>>>> doesn't keep up.
>>>>> Go has no mechanism I'm aware of to Just In Time compile more
>>>>> efficient processing of values.
>>>>> It's also not 100% clear how Schema's would play with protocol buffers
>>>>> or similar.
>>>>> BigQuery has a mechanism of generating a JSON schema from a proto file
>>>>> <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, but
>>>>> that's only the specification half, not the using half.
>>>>>
>>>>> As it stands, the code generator I've been building these last months
>>>>> could (in principle) statically analyze a user's struct, and then generate
>>>>> an efficient dedicated coder for it. It just has no where to put them such
>>>>> that the Go SDK would use it.
>>>>>
>>>>>
>>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> I'll make a different suggestion. There's been some chatter that
>>>>>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>>>>>> schemas the basic semantics instead of coders. Schemas provide everything a
>>>>>> coder provides, but also allows for far more readable code. We can't make
>>>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>>>>>> we're better off starting with schemas instead of coders?
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
>>>>>> wrote:
>>>>>>
>>>>>>> One area that the Go SDK currently lacks: is the ability for users
>>>>>>> to specify their own coders for types.
>>>>>>>
>>>>>>> I've written a proposal document,
>>>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>>>> that require discussion before getting on with the implementation.
>>>>>>>
>>>>>>> At presently, the SDK only permits primitive value types (all
>>>>>>> numeric types but complex, strings, and []bytes) which are coded with beam
>>>>>>> coders, and structs whose exported fields are of those type, which is then
>>>>>>> encoded as JSON. Protocol buffer support is hacked in to avoid the type
>>>>>>> anaiyzer, and presents the current work around this issue.
>>>>>>>
>>>>>>> The high level proposal is to catch up with Python and Java, and
>>>>>>> have a coder registry. In addition, arrays, and maps should be permitted as
>>>>>>> well.
>>>>>>>
>>>>>>> If you have alternatives, or other suggestions and opinions, I'd
>>>>>>> love to hear them! Otherwise my intent is to get a PR ready by the end of
>>>>>>> January.
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Robert Burke
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> http://go/where-is-rebo
>>>>>
>>>>
>>>
>>> --
>>> Cheers,
>>> Gleb
>>>
>>

Re: [Go SDK] User Defined Coders

Posted by Reuven Lax <re...@google.com>.
On Fri, Jan 4, 2019 at 1:19 AM Robert Burke <ro...@frantil.com> wrote:

> Very interesting Reuven!
>
> That would be a huge readability improvement, but it would also be a
> significant investment over my time budget to implement them on the Go side
> correctly. I would certainly want to read your documentation before going
> ahead.  Will the Portability FnAPI have dedicated Schema support? That
> would certainly change things.
>

Yes, there's absolutely a plan to add schema definitions to the FnAPI. This
is what will allow you to use SQL from BeamGo

>
> It's not clear to me how one might achieve the inversion from SchemaCoder
> being a special casing of CustomCoder to the other way around, since a
> field has a type, and that type needs to be encoded. Short of always
> encoding the primitive values in the way Beam prefers, it doesn't seem to
> allow for customizing the encoding on output, or really say anything
> outside of the (admittedly excellent) syntactic sugar demonstrated with the
> Java API.
>

I'm not quite sure I understand. But schemas define a fixed set of
primitive types, and also define the encodings for those primitive types.
If a user wants custom encoding for a primitive type, they can create a
byte-array field and wrap that field with a Coder (this is why I said that
todays Coders are simply special cases); this should be very rare though,
as users rarely should care how Beam encodes a long or a double.

>
> Offhand, Schemas seem to be an alternative to pipeline construction,
> rather than coders for value serialization, allowing manual field
> extraction code to be omitted. They do not appear to be a fundamental
> approach to achieve it. For example, the grouping operation still needs to
> encode the whole of the object as a value.
>

Schemas are properties of the data - essentially a Schema is the data type
of a PCollection. In Java Schemas are also understood by ParDo, so you can
write a ParDo like this:

@ProcessElement
public void process(@Field("user") String userId,  @Field("country") String
countryCode) {
}

These extra functionalities are part of the graph, but they are enabled by
schemas.

>
> As mentioned, I'm hoping to have a solution for existing coders by
> January's end, so waiting for your documentation doesn't work on that
> timeline.
>

I don't think we need to wait for all the documentation to be written.


>
> That said, they aren't incompatible ideas as demonstrated by the Java
> implementation. The Go SDK remains in an experimental state. We can change
> things should the need arise in the next few months. Further, whenever Generics
> in Go
> <https://go.googlesource.com/proposal/+/master/design/go2draft-generics-overview.md>
> crop up, the existing user surface and execution stack will need to be
> re-written to take advantage of them anyway. That provides an opportunity
> to invert Coder vs Schema dependence while getting a nice performance
> boost, and cleaner code (and deleting much of my code generator).
>
> ----
>
> Were I to implement schemas to get the same syntatic benefits as the Java
> API, I'd be leveraging the field annotations Go has. This satisfies the
> protocol buffer issue as well, since generated go protos have name & json
> annotations. Schemas could be extracted that way. These are also available
> to anything using static analysis for more direct generation of accessors.
> The reflective approach would also work, which is excellent for development
> purposes.
>
> The rote code that the schemas were replacing would be able to be cobbled
> together into efficient DoFn and CombineFns for serialization. At present,
> it seems like it could be implemented as a side package that uses beam,
> rather than changing portions of the core beam Go packages, The real trick
> would be to do so without "apply" since that's not how the Go SDK is shaped.
>
>
>
>
> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <gl...@spotify.com> wrote:
>
>> Reuven, it sounds great. I see there is a similar thing to Row coders
>> happening in Apache Arrow <https://arrow.apache.org>, and there is a
>> similarity between Apache Arrow Flight
>> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
>> and data exchange service in portability. How do you see these two things
>> relate to each other in the long term?
>>
>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>>
>>> The biggest advantage is actually readability and usability. A secondary
>>> advantage is that it means that Go will be able to interact seamlessly with
>>> BeamSQL, which would be a big win for Go.
>>>
>>> A schema is basically a way of saying that a record has a specific set
>>> of (possibly nested, possibly repeated) fields. So for instance let's say
>>> that the user's type is a struct with fields named user, country,
>>> purchaseCost. This allows us to provide transforms that operate on field
>>> names. Some example (using the Java API):
>>>
>>> PCollection users = events.apply(Select.fields("user"));  // Select out
>>> only the user field.
>>>
>>> PCollection joinedEvents =
>>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>>> PCollections by user.
>>>
>>> // For each country, calculate the total purchase cost as well as the
>>> top 10 purchases.
>>> // A new schema is created containing fields total_cost and
>>> top_purchases, and rows are created with the aggregation results.
>>> PCollection purchaseStatistics = events.apply(
>>>     Group.byFieldNames("country")
>>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>>> "total_cost"))
>>>                 .aggregateField("purchaseCost", Top.largestLongs(10),
>>> "top_purchases"))
>>>
>>>
>>> This is far more readable than what we have today, and what unlocks this
>>> is that Beam actually knows the structure of the record instead of assuming
>>> records are uncrackable blobs.
>>>
>>> Note that a coder is basically a special case of a schema that has a
>>> single field.
>>>
>>> In BeamJava we have a SchemaRegistry which knows how to turn user types
>>> into schemas. We use reflection to analyze many user types (e.g. simple
>>> POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to
>>> determine the schema, however this is done only when the graph is initially
>>> generated. We do use code generation (in Java we do bytecode generation) to
>>> make this somewhat more efficient. I'm willing to bet that the code
>>> generator you've written for structs could be very easily modified for
>>> schemas instead, so it would not be wasted work if we went with schemas.
>>>
>>> One of the things I'm working on now is documenting Beam schemas. They
>>> are already very powerful and useful, but since there is still nothing in
>>> our documentation about them, they are not yet widely used. I expect to
>>> finish draft documentation by the end of January.
>>>
>>> Reuven
>>>
>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>>>
>>>> That's an interesting idea. I must confess I don't rightly know the
>>>> difference between a schema and coder, but here's what I've got with a bit
>>>> of searching through memory and the mailing list. Please let me know if I'm
>>>> off track.
>>>>
>>>> As near as I can tell, a schema, as far as Beam takes it
>>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
>>>> a mechanism to define what data is extracted from a given row of data. So
>>>> in principle, there's an opportunity to be more efficient with data with
>>>> many columns that aren't being used, and only extract the data that's
>>>> meaningful to the pipeline.
>>>> The trick then is how to apply the schema to a given serialization
>>>> format, which is something I'm missing in my mental model (and then how to
>>>> do it efficiently in Go).
>>>>
>>>> I do know that the Go client package for BigQuery
>>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>>>> something like that, using field tags. Similarly, the "encoding/json"
>>>> <https://golang.org/doc/articles/json_and_go.html> package in the Go
>>>> Standard Library permits annotating fields and it will read out and
>>>> deserialize the JSON fields and that's it.
>>>>
>>>> A concern I have is that Go (at present) would require pre-compile time
>>>> code generation for schemas to be efficient, and they would still mostly
>>>> boil down to turning []bytes into real structs. Go reflection doesn't keep
>>>> up.
>>>> Go has no mechanism I'm aware of to Just In Time compile more efficient
>>>> processing of values.
>>>> It's also not 100% clear how Schema's would play with protocol buffers
>>>> or similar.
>>>> BigQuery has a mechanism of generating a JSON schema from a proto file
>>>> <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, but
>>>> that's only the specification half, not the using half.
>>>>
>>>> As it stands, the code generator I've been building these last months
>>>> could (in principle) statically analyze a user's struct, and then generate
>>>> an efficient dedicated coder for it. It just has no where to put them such
>>>> that the Go SDK would use it.
>>>>
>>>>
>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> I'll make a different suggestion. There's been some chatter that
>>>>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>>>>> schemas the basic semantics instead of coders. Schemas provide everything a
>>>>> coder provides, but also allows for far more readable code. We can't make
>>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>>>>> we're better off starting with schemas instead of coders?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com>
>>>>> wrote:
>>>>>
>>>>>> One area that the Go SDK currently lacks: is the ability for users to
>>>>>> specify their own coders for types.
>>>>>>
>>>>>> I've written a proposal document,
>>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>>> that require discussion before getting on with the implementation.
>>>>>>
>>>>>> At presently, the SDK only permits primitive value types (all numeric
>>>>>> types but complex, strings, and []bytes) which are coded with beam coders,
>>>>>> and structs whose exported fields are of those type, which is then encoded
>>>>>> as JSON. Protocol buffer support is hacked in to avoid the type anaiyzer,
>>>>>> and presents the current work around this issue.
>>>>>>
>>>>>> The high level proposal is to catch up with Python and Java, and have
>>>>>> a coder registry. In addition, arrays, and maps should be permitted as well.
>>>>>>
>>>>>> If you have alternatives, or other suggestions and opinions, I'd love
>>>>>> to hear them! Otherwise my intent is to get a PR ready by the end of
>>>>>> January.
>>>>>>
>>>>>> Thanks!
>>>>>> Robert Burke
>>>>>>
>>>>>
>>>>
>>>> --
>>>> http://go/where-is-rebo
>>>>
>>>
>>
>> --
>> Cheers,
>> Gleb
>>
>

Re: [Go SDK] User Defined Coders

Posted by Robert Burke <ro...@frantil.com>.
Very interesting Reuven!

That would be a huge readability improvement, but it would also be a
significant investment over my time budget to implement them on the Go side
correctly. I would certainly want to read your documentation before going
ahead.  Will the Portability FnAPI have dedicated Schema support? That
would certainly change things.

It's not clear to me how one might achieve the inversion from SchemaCoder
being a special casing of CustomCoder to the other way around, since a
field has a type, and that type needs to be encoded. Short of always
encoding the primitive values in the way Beam prefers, it doesn't seem to
allow for customizing the encoding on output, or really say anything
outside of the (admittedly excellent) syntactic sugar demonstrated with the
Java API.

Offhand, Schemas seem to be an alternative to pipeline construction, rather
than coders for value serialization, allowing manual field extraction code
to be omitted. They do not appear to be a fundamental approach to achieve
it. For example, the grouping operation still needs to encode the whole of
the object as a value.

As mentioned, I'm hoping to have a solution for existing coders by
January's end, so waiting for your documentation doesn't work on that
timeline.

That said, they aren't incompatible ideas as demonstrated by the Java
implementation. The Go SDK remains in an experimental state. We can change
things should the need arise in the next few months. Further, whenever Generics
in Go
<https://go.googlesource.com/proposal/+/master/design/go2draft-generics-overview.md>
crop up, the existing user surface and execution stack will need to be
re-written to take advantage of them anyway. That provides an opportunity
to invert Coder vs Schema dependence while getting a nice performance
boost, and cleaner code (and deleting much of my code generator).

----

Were I to implement schemas to get the same syntatic benefits as the Java
API, I'd be leveraging the field annotations Go has. This satisfies the
protocol buffer issue as well, since generated go protos have name & json
annotations. Schemas could be extracted that way. These are also available
to anything using static analysis for more direct generation of accessors.
The reflective approach would also work, which is excellent for development
purposes.

The rote code that the schemas were replacing would be able to be cobbled
together into efficient DoFn and CombineFns for serialization. At present,
it seems like it could be implemented as a side package that uses beam,
rather than changing portions of the core beam Go packages, The real trick
would be to do so without "apply" since that's not how the Go SDK is shaped.




On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <gl...@spotify.com> wrote:

> Reuven, it sounds great. I see there is a similar thing to Row coders
> happening in Apache Arrow <https://arrow.apache.org>, and there is a
> similarity between Apache Arrow Flight
> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
> and data exchange service in portability. How do you see these two things
> relate to each other in the long term?
>
> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>
>> The biggest advantage is actually readability and usability. A secondary
>> advantage is that it means that Go will be able to interact seamlessly with
>> BeamSQL, which would be a big win for Go.
>>
>> A schema is basically a way of saying that a record has a specific set of
>> (possibly nested, possibly repeated) fields. So for instance let's say that
>> the user's type is a struct with fields named user, country, purchaseCost.
>> This allows us to provide transforms that operate on field names. Some
>> example (using the Java API):
>>
>> PCollection users = events.apply(Select.fields("user"));  // Select out
>> only the user field.
>>
>> PCollection joinedEvents =
>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>> PCollections by user.
>>
>> // For each country, calculate the total purchase cost as well as the top
>> 10 purchases.
>> // A new schema is created containing fields total_cost and
>> top_purchases, and rows are created with the aggregation results.
>> PCollection purchaseStatistics = events.apply(
>>     Group.byFieldNames("country")
>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>> "total_cost"))
>>                 .aggregateField("purchaseCost", Top.largestLongs(10),
>> "top_purchases"))
>>
>>
>> This is far more readable than what we have today, and what unlocks this
>> is that Beam actually knows the structure of the record instead of assuming
>> records are uncrackable blobs.
>>
>> Note that a coder is basically a special case of a schema that has a
>> single field.
>>
>> In BeamJava we have a SchemaRegistry which knows how to turn user types
>> into schemas. We use reflection to analyze many user types (e.g. simple
>> POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to
>> determine the schema, however this is done only when the graph is initially
>> generated. We do use code generation (in Java we do bytecode generation) to
>> make this somewhat more efficient. I'm willing to bet that the code
>> generator you've written for structs could be very easily modified for
>> schemas instead, so it would not be wasted work if we went with schemas.
>>
>> One of the things I'm working on now is documenting Beam schemas. They
>> are already very powerful and useful, but since there is still nothing in
>> our documentation about them, they are not yet widely used. I expect to
>> finish draft documentation by the end of January.
>>
>> Reuven
>>
>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>>
>>> That's an interesting idea. I must confess I don't rightly know the
>>> difference between a schema and coder, but here's what I've got with a bit
>>> of searching through memory and the mailing list. Please let me know if I'm
>>> off track.
>>>
>>> As near as I can tell, a schema, as far as Beam takes it
>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
>>> a mechanism to define what data is extracted from a given row of data. So
>>> in principle, there's an opportunity to be more efficient with data with
>>> many columns that aren't being used, and only extract the data that's
>>> meaningful to the pipeline.
>>> The trick then is how to apply the schema to a given serialization
>>> format, which is something I'm missing in my mental model (and then how to
>>> do it efficiently in Go).
>>>
>>> I do know that the Go client package for BigQuery
>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>>> something like that, using field tags. Similarly, the "encoding/json"
>>> <https://golang.org/doc/articles/json_and_go.html> package in the Go
>>> Standard Library permits annotating fields and it will read out and
>>> deserialize the JSON fields and that's it.
>>>
>>> A concern I have is that Go (at present) would require pre-compile time
>>> code generation for schemas to be efficient, and they would still mostly
>>> boil down to turning []bytes into real structs. Go reflection doesn't keep
>>> up.
>>> Go has no mechanism I'm aware of to Just In Time compile more efficient
>>> processing of values.
>>> It's also not 100% clear how Schema's would play with protocol buffers
>>> or similar.
>>> BigQuery has a mechanism of generating a JSON schema from a proto file
>>> <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, but
>>> that's only the specification half, not the using half.
>>>
>>> As it stands, the code generator I've been building these last months
>>> could (in principle) statically analyze a user's struct, and then generate
>>> an efficient dedicated coder for it. It just has no where to put them such
>>> that the Go SDK would use it.
>>>
>>>
>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I'll make a different suggestion. There's been some chatter that
>>>> schemas are a better tool than coders, and that in Beam 3.0 we should make
>>>> schemas the basic semantics instead of coders. Schemas provide everything a
>>>> coder provides, but also allows for far more readable code. We can't make
>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in Go
>>>> we're better off starting with schemas instead of coders?
>>>>
>>>> Reuven
>>>>
>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com> wrote:
>>>>
>>>>> One area that the Go SDK currently lacks: is the ability for users to
>>>>> specify their own coders for types.
>>>>>
>>>>> I've written a proposal document,
>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>> that require discussion before getting on with the implementation.
>>>>>
>>>>> At presently, the SDK only permits primitive value types (all numeric
>>>>> types but complex, strings, and []bytes) which are coded with beam coders,
>>>>> and structs whose exported fields are of those type, which is then encoded
>>>>> as JSON. Protocol buffer support is hacked in to avoid the type anaiyzer,
>>>>> and presents the current work around this issue.
>>>>>
>>>>> The high level proposal is to catch up with Python and Java, and have
>>>>> a coder registry. In addition, arrays, and maps should be permitted as well.
>>>>>
>>>>> If you have alternatives, or other suggestions and opinions, I'd love
>>>>> to hear them! Otherwise my intent is to get a PR ready by the end of
>>>>> January.
>>>>>
>>>>> Thanks!
>>>>> Robert Burke
>>>>>
>>>>
>>>
>>> --
>>> http://go/where-is-rebo
>>>
>>
>
> --
> Cheers,
> Gleb
>

Re: [Go SDK] User Defined Coders

Posted by Gleb Kanterov <gl...@spotify.com>.
Reuven, it sounds great. I see there is a similar thing to Row coders
happening in Apache Arrow <https://arrow.apache.org>, and there is a
similarity between Apache Arrow Flight
<https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
and data exchange service in portability. How do you see these two things
relate to each other in the long term?

On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:

> The biggest advantage is actually readability and usability. A secondary
> advantage is that it means that Go will be able to interact seamlessly with
> BeamSQL, which would be a big win for Go.
>
> A schema is basically a way of saying that a record has a specific set of
> (possibly nested, possibly repeated) fields. So for instance let's say that
> the user's type is a struct with fields named user, country, purchaseCost.
> This allows us to provide transforms that operate on field names. Some
> example (using the Java API):
>
> PCollection users = events.apply(Select.fields("user"));  // Select out
> only the user field.
>
> PCollection joinedEvents =
> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
> PCollections by user.
>
> // For each country, calculate the total purchase cost as well as the top
> 10 purchases.
> // A new schema is created containing fields total_cost and top_purchases,
> and rows are created with the aggregation results.
> PCollection purchaseStatistics = events.apply(
>     Group.byFieldNames("country")
>                .aggregateField("purchaseCost", Sum.ofLongs(),
> "total_cost"))
>                 .aggregateField("purchaseCost", Top.largestLongs(10),
> "top_purchases"))
>
>
> This is far more readable than what we have today, and what unlocks this
> is that Beam actually knows the structure of the record instead of assuming
> records are uncrackable blobs.
>
> Note that a coder is basically a special case of a schema that has a
> single field.
>
> In BeamJava we have a SchemaRegistry which knows how to turn user types
> into schemas. We use reflection to analyze many user types (e.g. simple
> POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to
> determine the schema, however this is done only when the graph is initially
> generated. We do use code generation (in Java we do bytecode generation) to
> make this somewhat more efficient. I'm willing to bet that the code
> generator you've written for structs could be very easily modified for
> schemas instead, so it would not be wasted work if we went with schemas.
>
> One of the things I'm working on now is documenting Beam schemas. They are
> already very powerful and useful, but since there is still nothing in our
> documentation about them, they are not yet widely used. I expect to finish
> draft documentation by the end of January.
>
> Reuven
>
> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:
>
>> That's an interesting idea. I must confess I don't rightly know the
>> difference between a schema and coder, but here's what I've got with a bit
>> of searching through memory and the mailing list. Please let me know if I'm
>> off track.
>>
>> As near as I can tell, a schema, as far as Beam takes it
>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
>> a mechanism to define what data is extracted from a given row of data. So
>> in principle, there's an opportunity to be more efficient with data with
>> many columns that aren't being used, and only extract the data that's
>> meaningful to the pipeline.
>> The trick then is how to apply the schema to a given serialization
>> format, which is something I'm missing in my mental model (and then how to
>> do it efficiently in Go).
>>
>> I do know that the Go client package for BigQuery
>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>> something like that, using field tags. Similarly, the "encoding/json"
>> <https://golang.org/doc/articles/json_and_go.html> package in the Go
>> Standard Library permits annotating fields and it will read out and
>> deserialize the JSON fields and that's it.
>>
>> A concern I have is that Go (at present) would require pre-compile time
>> code generation for schemas to be efficient, and they would still mostly
>> boil down to turning []bytes into real structs. Go reflection doesn't keep
>> up.
>> Go has no mechanism I'm aware of to Just In Time compile more efficient
>> processing of values.
>> It's also not 100% clear how Schema's would play with protocol buffers or
>> similar.
>> BigQuery has a mechanism of generating a JSON schema from a proto file
>> <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, but
>> that's only the specification half, not the using half.
>>
>> As it stands, the code generator I've been building these last months
>> could (in principle) statically analyze a user's struct, and then generate
>> an efficient dedicated coder for it. It just has no where to put them such
>> that the Go SDK would use it.
>>
>>
>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>
>>> I'll make a different suggestion. There's been some chatter that schemas
>>> are a better tool than coders, and that in Beam 3.0 we should make schemas
>>> the basic semantics instead of coders. Schemas provide everything a coder
>>> provides, but also allows for far more readable code. We can't make such a
>>> change in Beam Java 2.X for compatibility reasons, but maybe in Go we're
>>> better off starting with schemas instead of coders?
>>>
>>> Reuven
>>>
>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com> wrote:
>>>
>>>> One area that the Go SDK currently lacks: is the ability for users to
>>>> specify their own coders for types.
>>>>
>>>> I've written a proposal document,
>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>>> while I'm confident about the core, there are certainly some edge cases
>>>> that require discussion before getting on with the implementation.
>>>>
>>>> At presently, the SDK only permits primitive value types (all numeric
>>>> types but complex, strings, and []bytes) which are coded with beam coders,
>>>> and structs whose exported fields are of those type, which is then encoded
>>>> as JSON. Protocol buffer support is hacked in to avoid the type anaiyzer,
>>>> and presents the current work around this issue.
>>>>
>>>> The high level proposal is to catch up with Python and Java, and have a
>>>> coder registry. In addition, arrays, and maps should be permitted as well.
>>>>
>>>> If you have alternatives, or other suggestions and opinions, I'd love
>>>> to hear them! Otherwise my intent is to get a PR ready by the end of
>>>> January.
>>>>
>>>> Thanks!
>>>> Robert Burke
>>>>
>>>
>>
>> --
>> http://go/where-is-rebo
>>
>

-- 
Cheers,
Gleb

Re: [Go SDK] User Defined Coders

Posted by Reuven Lax <re...@google.com>.
The biggest advantage is actually readability and usability. A secondary
advantage is that it means that Go will be able to interact seamlessly with
BeamSQL, which would be a big win for Go.

A schema is basically a way of saying that a record has a specific set of
(possibly nested, possibly repeated) fields. So for instance let's say that
the user's type is a struct with fields named user, country, purchaseCost.
This allows us to provide transforms that operate on field names. Some
example (using the Java API):

PCollection users = events.apply(Select.fields("user"));  // Select out
only the user field.

PCollection joinedEvents =
queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
PCollections by user.

// For each country, calculate the total purchase cost as well as the top
10 purchases.
// A new schema is created containing fields total_cost and top_purchases,
and rows are created with the aggregation results.
PCollection purchaseStatistics = events.apply(
    Group.byFieldNames("country")
               .aggregateField("purchaseCost", Sum.ofLongs(), "total_cost"))
                .aggregateField("purchaseCost", Top.largestLongs(10),
"top_purchases"))


This is far more readable than what we have today, and what unlocks this is
that Beam actually knows the structure of the record instead of assuming
records are uncrackable blobs.

Note that a coder is basically a special case of a schema that has a single
field.

In BeamJava we have a SchemaRegistry which knows how to turn user types
into schemas. We use reflection to analyze many user types (e.g. simple
POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to
determine the schema, however this is done only when the graph is initially
generated. We do use code generation (in Java we do bytecode generation) to
make this somewhat more efficient. I'm willing to bet that the code
generator you've written for structs could be very easily modified for
schemas instead, so it would not be wasted work if we went with schemas.

One of the things I'm working on now is documenting Beam schemas. They are
already very powerful and useful, but since there is still nothing in our
documentation about them, they are not yet widely used. I expect to finish
draft documentation by the end of January.

Reuven

On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <re...@google.com> wrote:

> That's an interesting idea. I must confess I don't rightly know the
> difference between a schema and coder, but here's what I've got with a bit
> of searching through memory and the mailing list. Please let me know if I'm
> off track.
>
> As near as I can tell, a schema, as far as Beam takes it
> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> is
> a mechanism to define what data is extracted from a given row of data. So
> in principle, there's an opportunity to be more efficient with data with
> many columns that aren't being used, and only extract the data that's
> meaningful to the pipeline.
> The trick then is how to apply the schema to a given serialization format,
> which is something I'm missing in my mental model (and then how to do it
> efficiently in Go).
>
> I do know that the Go client package for BigQuery
> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
> something like that, using field tags. Similarly, the "encoding/json"
> <https://golang.org/doc/articles/json_and_go.html> package in the Go
> Standard Library permits annotating fields and it will read out and
> deserialize the JSON fields and that's it.
>
> A concern I have is that Go (at present) would require pre-compile time
> code generation for schemas to be efficient, and they would still mostly
> boil down to turning []bytes into real structs. Go reflection doesn't keep
> up.
> Go has no mechanism I'm aware of to Just In Time compile more efficient
> processing of values.
> It's also not 100% clear how Schema's would play with protocol buffers or
> similar.
> BigQuery has a mechanism of generating a JSON schema from a proto file
> <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, but that's
> only the specification half, not the using half.
>
> As it stands, the code generator I've been building these last months
> could (in principle) statically analyze a user's struct, and then generate
> an efficient dedicated coder for it. It just has no where to put them such
> that the Go SDK would use it.
>
>
> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>
>> I'll make a different suggestion. There's been some chatter that schemas
>> are a better tool than coders, and that in Beam 3.0 we should make schemas
>> the basic semantics instead of coders. Schemas provide everything a coder
>> provides, but also allows for far more readable code. We can't make such a
>> change in Beam Java 2.X for compatibility reasons, but maybe in Go we're
>> better off starting with schemas instead of coders?
>>
>> Reuven
>>
>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com> wrote:
>>
>>> One area that the Go SDK currently lacks: is the ability for users to
>>> specify their own coders for types.
>>>
>>> I've written a proposal document,
>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>>> while I'm confident about the core, there are certainly some edge cases
>>> that require discussion before getting on with the implementation.
>>>
>>> At presently, the SDK only permits primitive value types (all numeric
>>> types but complex, strings, and []bytes) which are coded with beam coders,
>>> and structs whose exported fields are of those type, which is then encoded
>>> as JSON. Protocol buffer support is hacked in to avoid the type anaiyzer,
>>> and presents the current work around this issue.
>>>
>>> The high level proposal is to catch up with Python and Java, and have a
>>> coder registry. In addition, arrays, and maps should be permitted as well.
>>>
>>> If you have alternatives, or other suggestions and opinions, I'd love to
>>> hear them! Otherwise my intent is to get a PR ready by the end of January.
>>>
>>> Thanks!
>>> Robert Burke
>>>
>>
>
> --
> http://go/where-is-rebo
>

Re: [Go SDK] User Defined Coders

Posted by Robert Burke <re...@google.com>.
That's an interesting idea. I must confess I don't rightly know the
difference between a schema and coder, but here's what I've got with a bit
of searching through memory and the mailing list. Please let me know if I'm
off track.

As near as I can tell, a schema, as far as Beam takes it
<https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java>
is
a mechanism to define what data is extracted from a given row of data. So
in principle, there's an opportunity to be more efficient with data with
many columns that aren't being used, and only extract the data that's
meaningful to the pipeline.
The trick then is how to apply the schema to a given serialization format,
which is something I'm missing in my mental model (and then how to do it
efficiently in Go).

I do know that the Go client package for BigQuery
<https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does something
like that, using field tags. Similarly, the "encoding/json"
<https://golang.org/doc/articles/json_and_go.html> package in the Go
Standard Library permits annotating fields and it will read out and
deserialize the JSON fields and that's it.

A concern I have is that Go (at present) would require pre-compile time
code generation for schemas to be efficient, and they would still mostly
boil down to turning []bytes into real structs. Go reflection doesn't keep
up.
Go has no mechanism I'm aware of to Just In Time compile more efficient
processing of values.
It's also not 100% clear how Schema's would play with protocol buffers or
similar.
BigQuery has a mechanism of generating a JSON schema from a proto file
<https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, but that's
only the specification half, not the using half.

As it stands, the code generator I've been building these last months could
(in principle) statically analyze a user's struct, and then generate an
efficient dedicated coder for it. It just has no where to put them such
that the Go SDK would use it.


On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:

> I'll make a different suggestion. There's been some chatter that schemas
> are a better tool than coders, and that in Beam 3.0 we should make schemas
> the basic semantics instead of coders. Schemas provide everything a coder
> provides, but also allows for far more readable code. We can't make such a
> change in Beam Java 2.X for compatibility reasons, but maybe in Go we're
> better off starting with schemas instead of coders?
>
> Reuven
>
> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com> wrote:
>
>> One area that the Go SDK currently lacks: is the ability for users to
>> specify their own coders for types.
>>
>> I've written a proposal document,
>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
>> while I'm confident about the core, there are certainly some edge cases
>> that require discussion before getting on with the implementation.
>>
>> At presently, the SDK only permits primitive value types (all numeric
>> types but complex, strings, and []bytes) which are coded with beam coders,
>> and structs whose exported fields are of those type, which is then encoded
>> as JSON. Protocol buffer support is hacked in to avoid the type anaiyzer,
>> and presents the current work around this issue.
>>
>> The high level proposal is to catch up with Python and Java, and have a
>> coder registry. In addition, arrays, and maps should be permitted as well.
>>
>> If you have alternatives, or other suggestions and opinions, I'd love to
>> hear them! Otherwise my intent is to get a PR ready by the end of January.
>>
>> Thanks!
>> Robert Burke
>>
>

-- 
http://go/where-is-rebo

Re: [Go SDK] User Defined Coders

Posted by Reuven Lax <re...@google.com>.
I'll make a different suggestion. There's been some chatter that schemas
are a better tool than coders, and that in Beam 3.0 we should make schemas
the basic semantics instead of coders. Schemas provide everything a coder
provides, but also allows for far more readable code. We can't make such a
change in Beam Java 2.X for compatibility reasons, but maybe in Go we're
better off starting with schemas instead of coders?

Reuven

On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <ro...@frantil.com> wrote:

> One area that the Go SDK currently lacks: is the ability for users to
> specify their own coders for types.
>
> I've written a proposal document,
> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> and
> while I'm confident about the core, there are certainly some edge cases
> that require discussion before getting on with the implementation.
>
> At presently, the SDK only permits primitive value types (all numeric
> types but complex, strings, and []bytes) which are coded with beam coders,
> and structs whose exported fields are of those type, which is then encoded
> as JSON. Protocol buffer support is hacked in to avoid the type anaiyzer,
> and presents the current work around this issue.
>
> The high level proposal is to catch up with Python and Java, and have a
> coder registry. In addition, arrays, and maps should be permitted as well.
>
> If you have alternatives, or other suggestions and opinions, I'd love to
> hear them! Otherwise my intent is to get a PR ready by the end of January.
>
> Thanks!
> Robert Burke
>