You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Reuven Lax <re...@google.com> on 2018/08/29 05:40:10 UTC

Beam Schemas: current status

I wanted to send a quick note to the community about the current status of
schema-aware PCollections in Beam. As some might remember we had a good
discussion last year about the design of these schemas, involving many
folks from different parts of the community. I sent a summary earlier this
year explaining how schemas has been integrated into the DoFn framework.
Much has happened since then, and here are some of the highlights.

First, I want to emphasize that all the schema-aware classes are currently
marked @Experimental. Nothing is set in stone yet, so if you have questions
about any decisions made, please start a discussion!

SQL

The first big milestone for schemas was porting all of BeamSQL to use the
framework, which was done in pr/5956. This was a lot of work, exposed many
bugs in the schema implementation, but now provides great evidence that
schemas work!

Schema inference

Beam can automatically infer schemas from Java POJOs (objects with public
fields) or JavaBean objects (objects with getter/setter methods). Often you
can do this by simply annotating the class. For example:

@DefaultSchema(JavaFieldSchema.class)

public class UserEvent {

 public String userId;

 public LatLong location;

 Public String countryCode;

 public long transactionCost;

 public double transactionDuration;

 public List<String> traceMessages;

};

@DefaultSchema(JavaFieldSchema.class)

public class LatLong {

 public double latitude;

 public double longitude;

}

Beam will automatically infer schemas for these classes! So if you have a
PCollection<UserEvent>, it will automatically get the following schema:

UserEvent:

 userId: STRING

 location: ROW(LatLong)

 countryCode: STRING

 transactionCost: INT64

 transactionDuration: DOUBLE

 traceMessages: ARRAY[STRING]]


LatLong:

 latitude: DOUBLE

 longitude: DOUBLE

Now it’s not always possible to annotate the class like this (you may not
own the class definition), so you can also explicitly register this using
Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.

Coders

Beam has a built-in coder for any schema-aware PCollection, largely
removing the need for users to care about coders. We generate low-level
bytecode (using ByteBuddy) to implement the coder for each schema, so these
coders are quite performant. This provides a better default coder for Java
POJO objects as well. In the past users were recommended to use AvroCoder
for pojos, which many have found inefficient. Now there’s a more-efficient
solution.

Utility Transforms

Schemas are already useful for implementers of extensions such as SQL, but
the goal was to use them to make Beam itself easier to use. To this end,
I’ve been implementing a library of transforms that allow for easy
manipulation of schema PCollections. So far Filter and Select are merged,
Group is about to go out for review (it needs some more javadoc and unit
tests), and Join is being developed but doesn’t yet have a final interface.

Filter

Given a PCollection<LatLong>, I want to keep only those in an area of
southern manhattan. Well this is easy!

PCollection<LatLong> manhattanEvents = allEvents.apply(Filter

 .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)

 .whereFieldName("longitude", long -> long < -73.969 && long > -74.747));

Schemas along with lambdas allows us to write this transform declaratively.
The Filter transform also allows you to register filter functions that
operate on multiple fields at the same time.

Select

Let’s say that I don’t need all the fields in a row. For instance, I’m only
interested in the userId and traceMessages, and don’t care about the
location. In that case I can write the following:

PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”, “
traceMessages”));


BTW, Beam also keeps track of which fields are accessed by a transform In
the future we can automatically insert Selects in front of subgraphs to
drop fields that are not referenced in that subgraph.

Group

Group is one of the more advanced transforms. In its most basic form, it
provides a convenient way to group by key:

PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =

   allEvents.apply(Group.byFieldNames(“userId”, “countryCode”));

Notice how much more concise this is than using GroupByKey directly!

The Group transform really starts to shine however when you start
specifying aggregations. You can aggregate any field (or fields) and build
up an output schema based on these aggregations. For example:

PCollection<KV<Row, Row>> aggregated = allEvents.apply(

   Group.byFieldNames(“userId”, “countryCode”)

       .aggregateField("cost", Sum.ofLongs(), "total_cost")

       .aggregateField("cost", Top.<Long>largestFn(10), “top_purchases”)

       .aggregateField("transationDuration", ApproximateQuantilesCombineFn.
create(21),

             “durationHistogram”)));

This will individually aggregate the specified fields of the input items
(by user and country), and generate an output schema for these
aggregations. In this case, the output schema will be the following:

AggregatedSchema:

   total_cost: INT64

   top_purchases: ARRAY[INT64]

   durationHistogram: ARRAY[DOUBLE]

There are some more utility transforms I've written that are worth looking
at such as Convert (which can convert between user types that share a
schema) and Unnest (flattens nested schemas). There are also some others
such as Pivot that we should consider writing

There is still a lot to do. All the todo items are reflected in JIRA,
however here are some examples of current gaps:


   -

   Support for read-only POJOs (those with final fields) and JavaBean
   (objects without setters).
   -

   Automatic schema inference from more Java types: protocol buffers, avro,
   AutoValue, etc.
   -

   Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
   -

   Support for JsonPath expressions so users can better express nested
   fields. E.g. support expressions of the form Select.fields(“field1.field2”,
   “field3.*”, “field4[0].field5”);
   -

   Schemas still need to be defined in our portability layer so they can be
   used cross language.


If anyone is interested in helping close these gaps, you'll be helping make
Beam a better, more-usable system!

Reuven

Re: Beam Schemas: current status

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Aug 30, 2018 at 5:15 PM Reuven Lax <re...@google.com> wrote:

> Some answer inline:
> On Thu, Aug 30, 2018 at 7:56 AM Ismaël Mejía <ie...@gmail.com> wrote:
>
>> Thanks Reuven for the excellent summary and thanks to all the guys who
>> worked in the Schema/SQL improvements. This is great for usability. I
>> really like the idea of making user experience simpler, e.g. by
>> automatically inferring Coders. Some questions:
>>
>> - Any plans to add similar improvements for the python/go SDKs ?
>>
>
> Yes, all languages should support this. I'm focusing on Java first.
>

Python has native tuples and dictionaries which make handling records like
this less painful than it was in Java before Schemas, and the average user
rarely deals with coders, but I've been thinking about this for a while and
I still think there's a lot more we could do here. (E.g. when encoding
dictionaries, we must encode the key name for every element. If we knew
that every element pulled from the same keyset we could introduce a much
more efficient coder. The lack of static typing makes it harder to perform
optimizations like this.)

When we finally decide to do this, I think we should seriously consider
closely follow the Pandas APIs which have become a kind of de-facto
standard for writing these kinds of transformations.

- I suppose that deconstructing the element objects based on
>> annotations happens in flight, have you thought about a way to
>> eventually push this into the previous transform (e.g. the IO) via
>> some sort of push-down predicate ? (I suppose this applies for
>> jsonpath filters but should be more complex)
>>
>
> I assume you mean  turning a user element into something accessible by
> fields? For Pojos/JavaBeans we never "deconstruct" the elements. ByteBuddy
> is used to generate low-level field accessors for these objects. So
> internally the system can turn a Pojo into a Row object very cheaply, at
> the cost of a single object allocation. This also means that the memory
> representation for this object remains the user's Pojo.
>
> There is opportunity to push some things down though Since we can analyze
> the graph and determine which fields are being accessed (often), we could
> insert projections early in the graph that drops unused fields.
>
>
>> - Do you think it makes sense to have ways for IOs to provide
>> transforms to convert from/to Rows. I remember there was some work on
>> this for the SQL. I am wondering how we can make the Schema/SQL
>> experience even more friendly.
>>
>
> Yes, absolutely. Transforms aren't needed - the IOs need only register
> SerializableFunctions to do this conversion with the SchemaRegistry (if the
> schema is registered, there is a generic Convert transform that can always
> convert between any two types with the same Schema, including Rows). This
> will make these IOs much friendlier to use.
>
>>
>> - What is the current intermediate representation, if I remember well
>> weren’t Coders in part a way of hacking around the issues of
>> determinism in Java Serialization. So if we use Java serialization
>> (generated via bytebuddy) wouldn’t we have similar issues?
>>
>
> In addition to determinism issues, Java Serialization is an incredibly
> inefficient encoding mechanism (in some cases the serialized size of a
> record can be 100x larger). Java serialization isn't being used. Right now
> ByteBuddy is generating a nested coder that uses the existing Beam coders
> (e.g. StringUtf8Coder, VarLongCoder, etc.).
>
>>
>> - Have you envisioned other ways to serialize apart of the generation
>> via bytebuddy ? e.g. to make Row compatible with formats supported in
>> multiple languages e.g. protobuf, or some arrow-like thing that we can
>> just submit without reserialization and can be decoded back (this will
>> be great for portability).
>>
>
> Yes, I looked at Arrow and Flatbuffers, but I haven't had time to actually
> play with them yet. One problem with Arrow is that Beam has a
> record-at-a-time model, and Arrow is better suited for encoding batches of
> records.
>
> It's worth noting that in our portability model, all of these Beam coders
> will have a URN and an external spec as to how they encode - so the current
> coder should also be compatible across languages. A big advantage of
> formats such as proto or flatbuffers, is that they are already designed to
> support schema evolution (e.g. updating your schema with new fields) in a
> compatible way.
>
> Another thing to thing about: an encoding method that allows us to
> efficiently implement Select without parsing the whole record would be
> nice. There is a body of research in the DB world about good encoding
> methods for records.
>
> One idea: in the future the FnApi can natively represent schemas, which
> means that we won't need a coder at all. The portability layer can then
> choose its own encoding mechanism, and since FnApi can operate on batches,
> Arrow might be used to encode these batches efficiently.
>

+1. In Python (and probably even in Java), many of these "scema-aware"
operations can be performed on batches more efficiently than they can be
performed on individual records as well.


>
>
>> - Any discussions on row ↔ json conversion? Sounds like a trivial /
>> common case too
>>
>
> Should be an easy transform to add. It would also be nice to support
> JsonSchema as a way of reading schemas.
>
>
>>
>> Regards,
>> Ismael
>> On Thu, Aug 30, 2018 at 4:51 PM Reuven Lax <re...@google.com> wrote:
>> >
>> > Andrew - the @Experimental tag simply means that we are free to change
>> the interfaces without waiting for the next major Beam version. Once we are
>> happy to freeze these interfaces, we can drop the tag.
>> >
>> > On Wed, Aug 29, 2018 at 1:48 PM Andrew Pilloud <ap...@google.com>
>> wrote:
>> >>
>> >> The work you've done to generalize and expand Schemas has
>> significantly simplified what we need to do for SQL, I hope they are
>> valuable to everyone else too. What work remains before we can drop the
>> Experimental designation?
>> >>
>> >> Andrew
>> >>
>> >> On Wed, Aug 29, 2018 at 5:31 AM Eugene Kirpichov <ki...@google.com>
>> wrote:
>> >>>
>> >>> Wow, this is really coming together, congratulations and thanks for
>> the great work!
>> >>>
>> >>> On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax <re...@google.com> wrote:
>> >>>>
>> >>>> I wanted to send a quick note to the community about the current
>> status of schema-aware PCollections in Beam. As some might remember we had
>> a good discussion last year about the design of these schemas, involving
>> many folks from different parts of the community. I sent a summary earlier
>> this year explaining how schemas has been integrated into the DoFn
>> framework. Much has happened since then, and here are some of the
>> highlights.
>> >>>>
>> >>>>
>> >>>> First, I want to emphasize that all the schema-aware classes are
>> currently marked @Experimental. Nothing is set in stone yet, so if you have
>> questions about any decisions made, please start a discussion!
>> >>>>
>> >>>>
>> >>>> SQL
>> >>>>
>> >>>> The first big milestone for schemas was porting all of BeamSQL to
>> use the framework, which was done in pr/5956. This was a lot of work,
>> exposed many bugs in the schema implementation, but now provides great
>> evidence that schemas work!
>> >>>>
>> >>>>
>> >>>> Schema inference
>> >>>>
>> >>>> Beam can automatically infer schemas from Java POJOs (objects with
>> public fields) or JavaBean objects (objects with getter/setter methods).
>> Often you can do this by simply annotating the class. For example:
>> >>>>
>> >>>>
>> >>>> @DefaultSchema(JavaFieldSchema.class)
>> >>>>
>> >>>> public class UserEvent {
>> >>>>
>> >>>>  public String userId;
>> >>>>
>> >>>>  public LatLong location;
>> >>>>
>> >>>>  Public String countryCode;
>> >>>>
>> >>>>  public long transactionCost;
>> >>>>
>> >>>>  public double transactionDuration;
>> >>>>
>> >>>>  public List<String> traceMessages;
>> >>>>
>> >>>> };
>> >>>>
>> >>>>
>> >>>> @DefaultSchema(JavaFieldSchema.class)
>> >>>>
>> >>>> public class LatLong {
>> >>>>
>> >>>>  public double latitude;
>> >>>>
>> >>>>  public double longitude;
>> >>>>
>> >>>> }
>> >>>>
>> >>>>
>> >>>> Beam will automatically infer schemas for these classes! So if you
>> have a PCollection<UserEvent>, it will automatically get the following
>> schema:
>> >>>>
>> >>>>
>> >>>> UserEvent:
>> >>>>
>> >>>>  userId: STRING
>> >>>>
>> >>>>  location: ROW(LatLong)
>> >>>>
>> >>>>  countryCode: STRING
>> >>>>
>> >>>>  transactionCost: INT64
>> >>>>
>> >>>>  transactionDuration: DOUBLE
>> >>>>
>> >>>>  traceMessages: ARRAY[STRING]]
>> >>>>
>> >>>>
>> >>>> LatLong:
>> >>>>
>> >>>>  latitude: DOUBLE
>> >>>>
>> >>>>  longitude: DOUBLE
>> >>>>
>> >>>>
>> >>>> Now it’s not always possible to annotate the class like this (you
>> may not own the class definition), so you can also explicitly register this
>> using Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
>> >>>>
>> >>>>
>> >>>> Coders
>> >>>>
>> >>>> Beam has a built-in coder for any schema-aware PCollection, largely
>> removing the need for users to care about coders. We generate low-level
>> bytecode (using ByteBuddy) to implement the coder for each schema, so these
>> coders are quite performant. This provides a better default coder for Java
>> POJO objects as well. In the past users were recommended to use AvroCoder
>> for pojos, which many have found inefficient. Now there’s a more-efficient
>> solution.
>> >>>>
>> >>>>
>> >>>> Utility Transforms
>> >>>>
>> >>>> Schemas are already useful for implementers of extensions such as
>> SQL, but the goal was to use them to make Beam itself easier to use. To
>> this end, I’ve been implementing a library of transforms that allow for
>> easy manipulation of schema PCollections. So far Filter and Select are
>> merged, Group is about to go out for review (it needs some more javadoc and
>> unit tests), and Join is being developed but doesn’t yet have a final
>> interface.
>> >>>>
>> >>>>
>> >>>> Filter
>> >>>>
>> >>>> Given a PCollection<LatLong>, I want to keep only those in an area
>> of southern manhattan. Well this is easy!
>> >>>>
>> >>>>
>> >>>> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter
>> >>>>
>> >>>>  .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
>> >>>>
>> >>>>  .whereFieldName("longitude", long -> long < -73.969 && long >
>> -74.747));
>> >>>>
>> >>>>
>> >>>> Schemas along with lambdas allows us to write this transform
>> declaratively. The Filter transform also allows you to register filter
>> functions that operate on multiple fields at the same time.
>> >>>>
>> >>>>
>> >>>> Select
>> >>>>
>> >>>> Let’s say that I don’t need all the fields in a row. For instance,
>> I’m only interested in the userId and traceMessages, and don’t care about
>> the location. In that case I can write the following:
>> >>>>
>> >>>>
>> >>>> PCollection<Row> selected =
>> allEvents.apply(Select.fieldNames(“userId”, “traceMessages”));
>> >>>>
>> >>>>
>> >>>> BTW, Beam also keeps track of which fields are accessed by a
>> transform In the future we can automatically insert Selects in front of
>> subgraphs to drop fields that are not referenced in that subgraph.
>> >>>>
>> >>>>
>> >>>> Group
>> >>>>
>> >>>> Group is one of the more advanced transforms. In its most basic
>> form, it provides a convenient way to group by key:
>> >>>>
>> >>>>
>> >>>> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =
>> >>>>
>> >>>>    allEvents.apply(Group.byFieldNames(“userId”, “countryCode”));
>> >>>>
>> >>>>
>> >>>> Notice how much more concise this is than using GroupByKey directly!
>> >>>>
>> >>>>
>> >>>> The Group transform really starts to shine however when you start
>> specifying aggregations. You can aggregate any field (or fields) and build
>> up an output schema based on these aggregations. For example:
>> >>>>
>> >>>>
>> >>>> PCollection<KV<Row, Row>> aggregated = allEvents.apply(
>> >>>>
>> >>>>    Group.byFieldNames(“userId”, “countryCode”)
>> >>>>
>> >>>>        .aggregateField("cost", Sum.ofLongs(), "total_cost")
>> >>>>
>> >>>>        .aggregateField("cost", Top.<Long>largestFn(10),
>> “top_purchases”)
>> >>>>
>> >>>>        .aggregateField("transationDuration",
>> ApproximateQuantilesCombineFn.create(21),
>> >>>>
>> >>>>              “durationHistogram”)));
>> >>>>
>> >>>>
>> >>>> This will individually aggregate the specified fields of the input
>> items (by user and country), and generate an output schema for these
>> aggregations. In this case, the output schema will be the following:
>> >>>>
>> >>>>
>> >>>> AggregatedSchema:
>> >>>>
>> >>>>    total_cost: INT64
>> >>>>
>> >>>>    top_purchases: ARRAY[INT64]
>> >>>>
>> >>>>    durationHistogram: ARRAY[DOUBLE]
>> >>>>
>> >>>>
>> >>>> There are some more utility transforms I've written that are worth
>> looking at such as Convert (which can convert between user types that share
>> a schema) and Unnest (flattens nested schemas). There are also some others
>> such as Pivot that we should consider writing
>> >>>>
>> >>>>
>> >>>> There is still a lot to do. All the todo items are reflected in
>> JIRA, however here are some examples of current gaps:
>> >>>>
>> >>>>
>> >>>> Support for read-only POJOs (those with final fields) and JavaBean
>> (objects without setters).
>> >>>>
>> >>>> Automatic schema inference from more Java types: protocol buffers,
>> avro, AutoValue, etc.
>> >>>>
>> >>>> Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>> >>>>
>> >>>> Support for JsonPath expressions so users can better express nested
>> fields. E.g. support expressions of the form Select.fields(“field1.field2”,
>> “field3.*”, “field4[0].field5”);
>> >>>>
>> >>>> Schemas still need to be defined in our portability layer so they
>> can be used cross language.
>> >>>>
>> >>>>
>> >>>> If anyone is interested in helping close these gaps, you'll be
>> helping make Beam a better, more-usable system!
>> >>>>
>> >>>> Reuven
>> >>>>
>>
>

Re: Beam Schemas: current status

Posted by Reuven Lax <re...@google.com>.
Some answer inline:
On Thu, Aug 30, 2018 at 7:56 AM Ismaël Mejía <ie...@gmail.com> wrote:

> Thanks Reuven for the excellent summary and thanks to all the guys who
> worked in the Schema/SQL improvements. This is great for usability. I
> really like the idea of making user experience simpler, e.g. by
> automatically inferring Coders. Some questions:
>
> - Any plans to add similar improvements for the python/go SDKs ?
>

Yes, all languages should support this. I'm focusing on Java first.


>
> - I suppose that deconstructing the element objects based on
> annotations happens in flight, have you thought about a way to
> eventually push this into the previous transform (e.g. the IO) via
> some sort of push-down predicate ? (I suppose this applies for
> jsonpath filters but should be more complex)
>

I assume you mean  turning a user element into something accessible by
fields? For Pojos/JavaBeans we never "deconstruct" the elements. ByteBuddy
is used to generate low-level field accessors for these objects. So
internally the system can turn a Pojo into a Row object very cheaply, at
the cost of a single object allocation. This also means that the memory
representation for this object remains the user's Pojo.

There is opportunity to push some things down though Since we can analyze
the graph and determine which fields are being accessed (often), we could
insert projections early in the graph that drops unused fields.


> - Do you think it makes sense to have ways for IOs to provide
> transforms to convert from/to Rows. I remember there was some work on
> this for the SQL. I am wondering how we can make the Schema/SQL
> experience even more friendly.
>

Yes, absolutely. Transforms aren't needed - the IOs need only register
SerializableFunctions to do this conversion with the SchemaRegistry (if the
schema is registered, there is a generic Convert transform that can always
convert between any two types with the same Schema, including Rows). This
will make these IOs much friendlier to use.

>
> - What is the current intermediate representation, if I remember well
> weren’t Coders in part a way of hacking around the issues of
> determinism in Java Serialization. So if we use Java serialization
> (generated via bytebuddy) wouldn’t we have similar issues?
>

In addition to determinism issues, Java Serialization is an incredibly
inefficient encoding mechanism (in some cases the serialized size of a
record can be 100x larger). Java serialization isn't being used. Right now
ByteBuddy is generating a nested coder that uses the existing Beam coders
(e.g. StringUtf8Coder, VarLongCoder, etc.).

>
> - Have you envisioned other ways to serialize apart of the generation
> via bytebuddy ? e.g. to make Row compatible with formats supported in
> multiple languages e.g. protobuf, or some arrow-like thing that we can
> just submit without reserialization and can be decoded back (this will
> be great for portability).
>

Yes, I looked at Arrow and Flatbuffers, but I haven't had time to actually
play with them yet. One problem with Arrow is that Beam has a
record-at-a-time model, and Arrow is better suited for encoding batches of
records.

It's worth noting that in our portability model, all of these Beam coders
will have a URN and an external spec as to how they encode - so the current
coder should also be compatible across languages. A big advantage of
formats such as proto or flatbuffers, is that they are already designed to
support schema evolution (e.g. updating your schema with new fields) in a
compatible way.

Another thing to thing about: an encoding method that allows us to
efficiently implement Select without parsing the whole record would be
nice. There is a body of research in the DB world about good encoding
methods for records.

One idea: in the future the FnApi can natively represent schemas, which
means that we won't need a coder at all. The portability layer can then
choose its own encoding mechanism, and since FnApi can operate on batches,
Arrow might be used to encode these batches efficiently.


> - Any discussions on row ↔ json conversion? Sounds like a trivial /
> common case too
>

Should be an easy transform to add. It would also be nice to support
JsonSchema as a way of reading schemas.


>
> Regards,
> Ismael
> On Thu, Aug 30, 2018 at 4:51 PM Reuven Lax <re...@google.com> wrote:
> >
> > Andrew - the @Experimental tag simply means that we are free to change
> the interfaces without waiting for the next major Beam version. Once we are
> happy to freeze these interfaces, we can drop the tag.
> >
> > On Wed, Aug 29, 2018 at 1:48 PM Andrew Pilloud <ap...@google.com>
> wrote:
> >>
> >> The work you've done to generalize and expand Schemas has significantly
> simplified what we need to do for SQL, I hope they are valuable to everyone
> else too. What work remains before we can drop the Experimental designation?
> >>
> >> Andrew
> >>
> >> On Wed, Aug 29, 2018 at 5:31 AM Eugene Kirpichov <ki...@google.com>
> wrote:
> >>>
> >>> Wow, this is really coming together, congratulations and thanks for
> the great work!
> >>>
> >>> On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax <re...@google.com> wrote:
> >>>>
> >>>> I wanted to send a quick note to the community about the current
> status of schema-aware PCollections in Beam. As some might remember we had
> a good discussion last year about the design of these schemas, involving
> many folks from different parts of the community. I sent a summary earlier
> this year explaining how schemas has been integrated into the DoFn
> framework. Much has happened since then, and here are some of the
> highlights.
> >>>>
> >>>>
> >>>> First, I want to emphasize that all the schema-aware classes are
> currently marked @Experimental. Nothing is set in stone yet, so if you have
> questions about any decisions made, please start a discussion!
> >>>>
> >>>>
> >>>> SQL
> >>>>
> >>>> The first big milestone for schemas was porting all of BeamSQL to use
> the framework, which was done in pr/5956. This was a lot of work, exposed
> many bugs in the schema implementation, but now provides great evidence
> that schemas work!
> >>>>
> >>>>
> >>>> Schema inference
> >>>>
> >>>> Beam can automatically infer schemas from Java POJOs (objects with
> public fields) or JavaBean objects (objects with getter/setter methods).
> Often you can do this by simply annotating the class. For example:
> >>>>
> >>>>
> >>>> @DefaultSchema(JavaFieldSchema.class)
> >>>>
> >>>> public class UserEvent {
> >>>>
> >>>>  public String userId;
> >>>>
> >>>>  public LatLong location;
> >>>>
> >>>>  Public String countryCode;
> >>>>
> >>>>  public long transactionCost;
> >>>>
> >>>>  public double transactionDuration;
> >>>>
> >>>>  public List<String> traceMessages;
> >>>>
> >>>> };
> >>>>
> >>>>
> >>>> @DefaultSchema(JavaFieldSchema.class)
> >>>>
> >>>> public class LatLong {
> >>>>
> >>>>  public double latitude;
> >>>>
> >>>>  public double longitude;
> >>>>
> >>>> }
> >>>>
> >>>>
> >>>> Beam will automatically infer schemas for these classes! So if you
> have a PCollection<UserEvent>, it will automatically get the following
> schema:
> >>>>
> >>>>
> >>>> UserEvent:
> >>>>
> >>>>  userId: STRING
> >>>>
> >>>>  location: ROW(LatLong)
> >>>>
> >>>>  countryCode: STRING
> >>>>
> >>>>  transactionCost: INT64
> >>>>
> >>>>  transactionDuration: DOUBLE
> >>>>
> >>>>  traceMessages: ARRAY[STRING]]
> >>>>
> >>>>
> >>>> LatLong:
> >>>>
> >>>>  latitude: DOUBLE
> >>>>
> >>>>  longitude: DOUBLE
> >>>>
> >>>>
> >>>> Now it’s not always possible to annotate the class like this (you may
> not own the class definition), so you can also explicitly register this
> using Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
> >>>>
> >>>>
> >>>> Coders
> >>>>
> >>>> Beam has a built-in coder for any schema-aware PCollection, largely
> removing the need for users to care about coders. We generate low-level
> bytecode (using ByteBuddy) to implement the coder for each schema, so these
> coders are quite performant. This provides a better default coder for Java
> POJO objects as well. In the past users were recommended to use AvroCoder
> for pojos, which many have found inefficient. Now there’s a more-efficient
> solution.
> >>>>
> >>>>
> >>>> Utility Transforms
> >>>>
> >>>> Schemas are already useful for implementers of extensions such as
> SQL, but the goal was to use them to make Beam itself easier to use. To
> this end, I’ve been implementing a library of transforms that allow for
> easy manipulation of schema PCollections. So far Filter and Select are
> merged, Group is about to go out for review (it needs some more javadoc and
> unit tests), and Join is being developed but doesn’t yet have a final
> interface.
> >>>>
> >>>>
> >>>> Filter
> >>>>
> >>>> Given a PCollection<LatLong>, I want to keep only those in an area of
> southern manhattan. Well this is easy!
> >>>>
> >>>>
> >>>> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter
> >>>>
> >>>>  .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
> >>>>
> >>>>  .whereFieldName("longitude", long -> long < -73.969 && long >
> -74.747));
> >>>>
> >>>>
> >>>> Schemas along with lambdas allows us to write this transform
> declaratively. The Filter transform also allows you to register filter
> functions that operate on multiple fields at the same time.
> >>>>
> >>>>
> >>>> Select
> >>>>
> >>>> Let’s say that I don’t need all the fields in a row. For instance,
> I’m only interested in the userId and traceMessages, and don’t care about
> the location. In that case I can write the following:
> >>>>
> >>>>
> >>>> PCollection<Row> selected =
> allEvents.apply(Select.fieldNames(“userId”, “traceMessages”));
> >>>>
> >>>>
> >>>> BTW, Beam also keeps track of which fields are accessed by a
> transform In the future we can automatically insert Selects in front of
> subgraphs to drop fields that are not referenced in that subgraph.
> >>>>
> >>>>
> >>>> Group
> >>>>
> >>>> Group is one of the more advanced transforms. In its most basic form,
> it provides a convenient way to group by key:
> >>>>
> >>>>
> >>>> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =
> >>>>
> >>>>    allEvents.apply(Group.byFieldNames(“userId”, “countryCode”));
> >>>>
> >>>>
> >>>> Notice how much more concise this is than using GroupByKey directly!
> >>>>
> >>>>
> >>>> The Group transform really starts to shine however when you start
> specifying aggregations. You can aggregate any field (or fields) and build
> up an output schema based on these aggregations. For example:
> >>>>
> >>>>
> >>>> PCollection<KV<Row, Row>> aggregated = allEvents.apply(
> >>>>
> >>>>    Group.byFieldNames(“userId”, “countryCode”)
> >>>>
> >>>>        .aggregateField("cost", Sum.ofLongs(), "total_cost")
> >>>>
> >>>>        .aggregateField("cost", Top.<Long>largestFn(10),
> “top_purchases”)
> >>>>
> >>>>        .aggregateField("transationDuration",
> ApproximateQuantilesCombineFn.create(21),
> >>>>
> >>>>              “durationHistogram”)));
> >>>>
> >>>>
> >>>> This will individually aggregate the specified fields of the input
> items (by user and country), and generate an output schema for these
> aggregations. In this case, the output schema will be the following:
> >>>>
> >>>>
> >>>> AggregatedSchema:
> >>>>
> >>>>    total_cost: INT64
> >>>>
> >>>>    top_purchases: ARRAY[INT64]
> >>>>
> >>>>    durationHistogram: ARRAY[DOUBLE]
> >>>>
> >>>>
> >>>> There are some more utility transforms I've written that are worth
> looking at such as Convert (which can convert between user types that share
> a schema) and Unnest (flattens nested schemas). There are also some others
> such as Pivot that we should consider writing
> >>>>
> >>>>
> >>>> There is still a lot to do. All the todo items are reflected in JIRA,
> however here are some examples of current gaps:
> >>>>
> >>>>
> >>>> Support for read-only POJOs (those with final fields) and JavaBean
> (objects without setters).
> >>>>
> >>>> Automatic schema inference from more Java types: protocol buffers,
> avro, AutoValue, etc.
> >>>>
> >>>> Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
> >>>>
> >>>> Support for JsonPath expressions so users can better express nested
> fields. E.g. support expressions of the form Select.fields(“field1.field2”,
> “field3.*”, “field4[0].field5”);
> >>>>
> >>>> Schemas still need to be defined in our portability layer so they can
> be used cross language.
> >>>>
> >>>>
> >>>> If anyone is interested in helping close these gaps, you'll be
> helping make Beam a better, more-usable system!
> >>>>
> >>>> Reuven
> >>>>
>

Re: Beam Schemas: current status

Posted by Ismaël Mejía <ie...@gmail.com>.
Thanks Reuven for the excellent summary and thanks to all the guys who
worked in the Schema/SQL improvements. This is great for usability. I
really like the idea of making user experience simpler, e.g. by
automatically inferring Coders. Some questions:

- Any plans to add similar improvements for the python/go SDKs ?

- I suppose that deconstructing the element objects based on
annotations happens in flight, have you thought about a way to
eventually push this into the previous transform (e.g. the IO) via
some sort of push-down predicate ? (I suppose this applies for
jsonpath filters but should be more complex)

- Do you think it makes sense to have ways for IOs to provide
transforms to convert from/to Rows. I remember there was some work on
this for the SQL. I am wondering how we can make the Schema/SQL
experience even more friendly.

- What is the current intermediate representation, if I remember well
weren’t Coders in part a way of hacking around the issues of
determinism in Java Serialization. So if we use Java serialization
(generated via bytebuddy) wouldn’t we have similar issues?

- Have you envisioned other ways to serialize apart of the generation
via bytebuddy ? e.g. to make Row compatible with formats supported in
multiple languages e.g. protobuf, or some arrow-like thing that we can
just submit without reserialization and can be decoded back (this will
be great for portability).

- Any discussions on row ↔ json conversion? Sounds like a trivial /
common case too

Regards,
Ismael
On Thu, Aug 30, 2018 at 4:51 PM Reuven Lax <re...@google.com> wrote:
>
> Andrew - the @Experimental tag simply means that we are free to change the interfaces without waiting for the next major Beam version. Once we are happy to freeze these interfaces, we can drop the tag.
>
> On Wed, Aug 29, 2018 at 1:48 PM Andrew Pilloud <ap...@google.com> wrote:
>>
>> The work you've done to generalize and expand Schemas has significantly simplified what we need to do for SQL, I hope they are valuable to everyone else too. What work remains before we can drop the Experimental designation?
>>
>> Andrew
>>
>> On Wed, Aug 29, 2018 at 5:31 AM Eugene Kirpichov <ki...@google.com> wrote:
>>>
>>> Wow, this is really coming together, congratulations and thanks for the great work!
>>>
>>> On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>> I wanted to send a quick note to the community about the current status of schema-aware PCollections in Beam. As some might remember we had a good discussion last year about the design of these schemas, involving many folks from different parts of the community. I sent a summary earlier this year explaining how schemas has been integrated into the DoFn framework. Much has happened since then, and here are some of the highlights.
>>>>
>>>>
>>>> First, I want to emphasize that all the schema-aware classes are currently marked @Experimental. Nothing is set in stone yet, so if you have questions about any decisions made, please start a discussion!
>>>>
>>>>
>>>> SQL
>>>>
>>>> The first big milestone for schemas was porting all of BeamSQL to use the framework, which was done in pr/5956. This was a lot of work, exposed many bugs in the schema implementation, but now provides great evidence that schemas work!
>>>>
>>>>
>>>> Schema inference
>>>>
>>>> Beam can automatically infer schemas from Java POJOs (objects with public fields) or JavaBean objects (objects with getter/setter methods). Often you can do this by simply annotating the class. For example:
>>>>
>>>>
>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>
>>>> public class UserEvent {
>>>>
>>>>  public String userId;
>>>>
>>>>  public LatLong location;
>>>>
>>>>  Public String countryCode;
>>>>
>>>>  public long transactionCost;
>>>>
>>>>  public double transactionDuration;
>>>>
>>>>  public List<String> traceMessages;
>>>>
>>>> };
>>>>
>>>>
>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>
>>>> public class LatLong {
>>>>
>>>>  public double latitude;
>>>>
>>>>  public double longitude;
>>>>
>>>> }
>>>>
>>>>
>>>> Beam will automatically infer schemas for these classes! So if you have a PCollection<UserEvent>, it will automatically get the following schema:
>>>>
>>>>
>>>> UserEvent:
>>>>
>>>>  userId: STRING
>>>>
>>>>  location: ROW(LatLong)
>>>>
>>>>  countryCode: STRING
>>>>
>>>>  transactionCost: INT64
>>>>
>>>>  transactionDuration: DOUBLE
>>>>
>>>>  traceMessages: ARRAY[STRING]]
>>>>
>>>>
>>>> LatLong:
>>>>
>>>>  latitude: DOUBLE
>>>>
>>>>  longitude: DOUBLE
>>>>
>>>>
>>>> Now it’s not always possible to annotate the class like this (you may not own the class definition), so you can also explicitly register this using Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
>>>>
>>>>
>>>> Coders
>>>>
>>>> Beam has a built-in coder for any schema-aware PCollection, largely removing the need for users to care about coders. We generate low-level bytecode (using ByteBuddy) to implement the coder for each schema, so these coders are quite performant. This provides a better default coder for Java POJO objects as well. In the past users were recommended to use AvroCoder for pojos, which many have found inefficient. Now there’s a more-efficient solution.
>>>>
>>>>
>>>> Utility Transforms
>>>>
>>>> Schemas are already useful for implementers of extensions such as SQL, but the goal was to use them to make Beam itself easier to use. To this end, I’ve been implementing a library of transforms that allow for easy manipulation of schema PCollections. So far Filter and Select are merged, Group is about to go out for review (it needs some more javadoc and unit tests), and Join is being developed but doesn’t yet have a final interface.
>>>>
>>>>
>>>> Filter
>>>>
>>>> Given a PCollection<LatLong>, I want to keep only those in an area of southern manhattan. Well this is easy!
>>>>
>>>>
>>>> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter
>>>>
>>>>  .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
>>>>
>>>>  .whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
>>>>
>>>>
>>>> Schemas along with lambdas allows us to write this transform declaratively. The Filter transform also allows you to register filter functions that operate on multiple fields at the same time.
>>>>
>>>>
>>>> Select
>>>>
>>>> Let’s say that I don’t need all the fields in a row. For instance, I’m only interested in the userId and traceMessages, and don’t care about the location. In that case I can write the following:
>>>>
>>>>
>>>> PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”, “traceMessages”));
>>>>
>>>>
>>>> BTW, Beam also keeps track of which fields are accessed by a transform In the future we can automatically insert Selects in front of subgraphs to drop fields that are not referenced in that subgraph.
>>>>
>>>>
>>>> Group
>>>>
>>>> Group is one of the more advanced transforms. In its most basic form, it provides a convenient way to group by key:
>>>>
>>>>
>>>> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =
>>>>
>>>>    allEvents.apply(Group.byFieldNames(“userId”, “countryCode”));
>>>>
>>>>
>>>> Notice how much more concise this is than using GroupByKey directly!
>>>>
>>>>
>>>> The Group transform really starts to shine however when you start specifying aggregations. You can aggregate any field (or fields) and build up an output schema based on these aggregations. For example:
>>>>
>>>>
>>>> PCollection<KV<Row, Row>> aggregated = allEvents.apply(
>>>>
>>>>    Group.byFieldNames(“userId”, “countryCode”)
>>>>
>>>>        .aggregateField("cost", Sum.ofLongs(), "total_cost")
>>>>
>>>>        .aggregateField("cost", Top.<Long>largestFn(10), “top_purchases”)
>>>>
>>>>        .aggregateField("transationDuration", ApproximateQuantilesCombineFn.create(21),
>>>>
>>>>              “durationHistogram”)));
>>>>
>>>>
>>>> This will individually aggregate the specified fields of the input items (by user and country), and generate an output schema for these aggregations. In this case, the output schema will be the following:
>>>>
>>>>
>>>> AggregatedSchema:
>>>>
>>>>    total_cost: INT64
>>>>
>>>>    top_purchases: ARRAY[INT64]
>>>>
>>>>    durationHistogram: ARRAY[DOUBLE]
>>>>
>>>>
>>>> There are some more utility transforms I've written that are worth looking at such as Convert (which can convert between user types that share a schema) and Unnest (flattens nested schemas). There are also some others such as Pivot that we should consider writing
>>>>
>>>>
>>>> There is still a lot to do. All the todo items are reflected in JIRA, however here are some examples of current gaps:
>>>>
>>>>
>>>> Support for read-only POJOs (those with final fields) and JavaBean (objects without setters).
>>>>
>>>> Automatic schema inference from more Java types: protocol buffers, avro, AutoValue, etc.
>>>>
>>>> Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>>>>
>>>> Support for JsonPath expressions so users can better express nested fields. E.g. support expressions of the form Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”);
>>>>
>>>> Schemas still need to be defined in our portability layer so they can be used cross language.
>>>>
>>>>
>>>> If anyone is interested in helping close these gaps, you'll be helping make Beam a better, more-usable system!
>>>>
>>>> Reuven
>>>>

Re: Beam Schemas: current status

Posted by Reuven Lax <re...@google.com>.
Andrew - the @Experimental tag simply means that we are free to change the
interfaces without waiting for the next major Beam version. Once we are
happy to freeze these interfaces, we can drop the tag.

On Wed, Aug 29, 2018 at 1:48 PM Andrew Pilloud <ap...@google.com> wrote:

> The work you've done to generalize and expand Schemas has significantly
> simplified what we need to do for SQL, I hope they are valuable to everyone
> else too. What work remains before we can drop the Experimental designation?
>
> Andrew
>
> On Wed, Aug 29, 2018 at 5:31 AM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Wow, this is really coming together, congratulations and thanks for the
>> great work!
>>
>> On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax <re...@google.com> wrote:
>>
>>> I wanted to send a quick note to the community about the current status
>>> of schema-aware PCollections in Beam. As some might remember we had a good
>>> discussion last year about the design of these schemas, involving many
>>> folks from different parts of the community. I sent a summary earlier this
>>> year explaining how schemas has been integrated into the DoFn framework.
>>> Much has happened since then, and here are some of the highlights.
>>>
>>> First, I want to emphasize that all the schema-aware classes are
>>> currently marked @Experimental. Nothing is set in stone yet, so if you have
>>> questions about any decisions made, please start a discussion!
>>>
>>> SQL
>>>
>>> The first big milestone for schemas was porting all of BeamSQL to use
>>> the framework, which was done in pr/5956. This was a lot of work, exposed
>>> many bugs in the schema implementation, but now provides great evidence
>>> that schemas work!
>>>
>>> Schema inference
>>>
>>> Beam can automatically infer schemas from Java POJOs (objects with
>>> public fields) or JavaBean objects (objects with getter/setter methods).
>>> Often you can do this by simply annotating the class. For example:
>>>
>>> @DefaultSchema(JavaFieldSchema.class)
>>>
>>> public class UserEvent {
>>>
>>>  public String userId;
>>>
>>>  public LatLong location;
>>>
>>>  Public String countryCode;
>>>
>>>  public long transactionCost;
>>>
>>>  public double transactionDuration;
>>>
>>>  public List<String> traceMessages;
>>>
>>> };
>>>
>>> @DefaultSchema(JavaFieldSchema.class)
>>>
>>> public class LatLong {
>>>
>>>  public double latitude;
>>>
>>>  public double longitude;
>>>
>>> }
>>>
>>> Beam will automatically infer schemas for these classes! So if you have
>>> a PCollection<UserEvent>, it will automatically get the following schema:
>>>
>>> UserEvent:
>>>
>>>  userId: STRING
>>>
>>>  location: ROW(LatLong)
>>>
>>>  countryCode: STRING
>>>
>>>  transactionCost: INT64
>>>
>>>  transactionDuration: DOUBLE
>>>
>>>  traceMessages: ARRAY[STRING]]
>>>
>>>
>>> LatLong:
>>>
>>>  latitude: DOUBLE
>>>
>>>  longitude: DOUBLE
>>>
>>> Now it’s not always possible to annotate the class like this (you may
>>> not own the class definition), so you can also explicitly register this
>>> using Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
>>>
>>> Coders
>>>
>>> Beam has a built-in coder for any schema-aware PCollection, largely
>>> removing the need for users to care about coders. We generate low-level
>>> bytecode (using ByteBuddy) to implement the coder for each schema, so these
>>> coders are quite performant. This provides a better default coder for Java
>>> POJO objects as well. In the past users were recommended to use AvroCoder
>>> for pojos, which many have found inefficient. Now there’s a more-efficient
>>> solution.
>>>
>>> Utility Transforms
>>>
>>> Schemas are already useful for implementers of extensions such as SQL,
>>> but the goal was to use them to make Beam itself easier to use. To this
>>> end, I’ve been implementing a library of transforms that allow for easy
>>> manipulation of schema PCollections. So far Filter and Select are merged,
>>> Group is about to go out for review (it needs some more javadoc and unit
>>> tests), and Join is being developed but doesn’t yet have a final interface.
>>>
>>> Filter
>>>
>>> Given a PCollection<LatLong>, I want to keep only those in an area of
>>> southern manhattan. Well this is easy!
>>>
>>> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter
>>>
>>>  .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
>>>
>>>  .whereFieldName("longitude", long -> long < -73.969 && long > -74.747
>>> ));
>>>
>>> Schemas along with lambdas allows us to write this transform
>>> declaratively. The Filter transform also allows you to register filter
>>> functions that operate on multiple fields at the same time.
>>>
>>> Select
>>>
>>> Let’s say that I don’t need all the fields in a row. For instance, I’m
>>> only interested in the userId and traceMessages, and don’t care about the
>>> location. In that case I can write the following:
>>>
>>> PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”,
>>> “traceMessages”));
>>>
>>>
>>> BTW, Beam also keeps track of which fields are accessed by a transform
>>> In the future we can automatically insert Selects in front of subgraphs to
>>> drop fields that are not referenced in that subgraph.
>>>
>>> Group
>>>
>>> Group is one of the more advanced transforms. In its most basic form, it
>>> provides a convenient way to group by key:
>>>
>>> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =
>>>
>>>    allEvents.apply(Group.byFieldNames(“userId”, “countryCode”));
>>>
>>> Notice how much more concise this is than using GroupByKey directly!
>>>
>>> The Group transform really starts to shine however when you start
>>> specifying aggregations. You can aggregate any field (or fields) and build
>>> up an output schema based on these aggregations. For example:
>>>
>>> PCollection<KV<Row, Row>> aggregated = allEvents.apply(
>>>
>>>    Group.byFieldNames(“userId”, “countryCode”)
>>>
>>>        .aggregateField("cost", Sum.ofLongs(), "total_cost")
>>>
>>>        .aggregateField("cost", Top.<Long>largestFn(10), “top_purchases”)
>>>
>>>        .aggregateField("transationDuration",
>>> ApproximateQuantilesCombineFn.create(21),
>>>
>>>              “durationHistogram”)));
>>>
>>> This will individually aggregate the specified fields of the input items
>>> (by user and country), and generate an output schema for these
>>> aggregations. In this case, the output schema will be the following:
>>>
>>> AggregatedSchema:
>>>
>>>    total_cost: INT64
>>>
>>>    top_purchases: ARRAY[INT64]
>>>
>>>    durationHistogram: ARRAY[DOUBLE]
>>>
>>> There are some more utility transforms I've written that are worth
>>> looking at such as Convert (which can convert between user types that share
>>> a schema) and Unnest (flattens nested schemas). There are also some others
>>> such as Pivot that we should consider writing
>>>
>>> There is still a lot to do. All the todo items are reflected in JIRA,
>>> however here are some examples of current gaps:
>>>
>>>
>>>    -
>>>
>>>    Support for read-only POJOs (those with final fields) and JavaBean
>>>    (objects without setters).
>>>    -
>>>
>>>    Automatic schema inference from more Java types: protocol buffers,
>>>    avro, AutoValue, etc.
>>>    -
>>>
>>>    Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>>>    -
>>>
>>>    Support for JsonPath expressions so users can better express nested
>>>    fields. E.g. support expressions of the form Select.fields(“field1.field2”,
>>>    “field3.*”, “field4[0].field5”);
>>>    -
>>>
>>>    Schemas still need to be defined in our portability layer so they
>>>    can be used cross language.
>>>
>>>
>>> If anyone is interested in helping close these gaps, you'll be helping
>>> make Beam a better, more-usable system!
>>>
>>> Reuven
>>>
>>>

Re: Beam Schemas: current status

Posted by Andrew Pilloud <ap...@google.com>.
The work you've done to generalize and expand Schemas has significantly
simplified what we need to do for SQL, I hope they are valuable to everyone
else too. What work remains before we can drop the Experimental designation?

Andrew

On Wed, Aug 29, 2018 at 5:31 AM Eugene Kirpichov <ki...@google.com>
wrote:

> Wow, this is really coming together, congratulations and thanks for the
> great work!
>
> On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax <re...@google.com> wrote:
>
>> I wanted to send a quick note to the community about the current status
>> of schema-aware PCollections in Beam. As some might remember we had a good
>> discussion last year about the design of these schemas, involving many
>> folks from different parts of the community. I sent a summary earlier this
>> year explaining how schemas has been integrated into the DoFn framework.
>> Much has happened since then, and here are some of the highlights.
>>
>> First, I want to emphasize that all the schema-aware classes are
>> currently marked @Experimental. Nothing is set in stone yet, so if you have
>> questions about any decisions made, please start a discussion!
>>
>> SQL
>>
>> The first big milestone for schemas was porting all of BeamSQL to use the
>> framework, which was done in pr/5956. This was a lot of work, exposed many
>> bugs in the schema implementation, but now provides great evidence that
>> schemas work!
>>
>> Schema inference
>>
>> Beam can automatically infer schemas from Java POJOs (objects with public
>> fields) or JavaBean objects (objects with getter/setter methods). Often you
>> can do this by simply annotating the class. For example:
>>
>> @DefaultSchema(JavaFieldSchema.class)
>>
>> public class UserEvent {
>>
>>  public String userId;
>>
>>  public LatLong location;
>>
>>  Public String countryCode;
>>
>>  public long transactionCost;
>>
>>  public double transactionDuration;
>>
>>  public List<String> traceMessages;
>>
>> };
>>
>> @DefaultSchema(JavaFieldSchema.class)
>>
>> public class LatLong {
>>
>>  public double latitude;
>>
>>  public double longitude;
>>
>> }
>>
>> Beam will automatically infer schemas for these classes! So if you have a
>> PCollection<UserEvent>, it will automatically get the following schema:
>>
>> UserEvent:
>>
>>  userId: STRING
>>
>>  location: ROW(LatLong)
>>
>>  countryCode: STRING
>>
>>  transactionCost: INT64
>>
>>  transactionDuration: DOUBLE
>>
>>  traceMessages: ARRAY[STRING]]
>>
>>
>> LatLong:
>>
>>  latitude: DOUBLE
>>
>>  longitude: DOUBLE
>>
>> Now it’s not always possible to annotate the class like this (you may not
>> own the class definition), so you can also explicitly register this using
>> Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
>>
>> Coders
>>
>> Beam has a built-in coder for any schema-aware PCollection, largely
>> removing the need for users to care about coders. We generate low-level
>> bytecode (using ByteBuddy) to implement the coder for each schema, so these
>> coders are quite performant. This provides a better default coder for Java
>> POJO objects as well. In the past users were recommended to use AvroCoder
>> for pojos, which many have found inefficient. Now there’s a more-efficient
>> solution.
>>
>> Utility Transforms
>>
>> Schemas are already useful for implementers of extensions such as SQL,
>> but the goal was to use them to make Beam itself easier to use. To this
>> end, I’ve been implementing a library of transforms that allow for easy
>> manipulation of schema PCollections. So far Filter and Select are merged,
>> Group is about to go out for review (it needs some more javadoc and unit
>> tests), and Join is being developed but doesn’t yet have a final interface.
>>
>> Filter
>>
>> Given a PCollection<LatLong>, I want to keep only those in an area of
>> southern manhattan. Well this is easy!
>>
>> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter
>>
>>  .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
>>
>>  .whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
>>
>> Schemas along with lambdas allows us to write this transform
>> declaratively. The Filter transform also allows you to register filter
>> functions that operate on multiple fields at the same time.
>>
>> Select
>>
>> Let’s say that I don’t need all the fields in a row. For instance, I’m
>> only interested in the userId and traceMessages, and don’t care about the
>> location. In that case I can write the following:
>>
>> PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”, “
>> traceMessages”));
>>
>>
>> BTW, Beam also keeps track of which fields are accessed by a transform In
>> the future we can automatically insert Selects in front of subgraphs to
>> drop fields that are not referenced in that subgraph.
>>
>> Group
>>
>> Group is one of the more advanced transforms. In its most basic form, it
>> provides a convenient way to group by key:
>>
>> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =
>>
>>    allEvents.apply(Group.byFieldNames(“userId”, “countryCode”));
>>
>> Notice how much more concise this is than using GroupByKey directly!
>>
>> The Group transform really starts to shine however when you start
>> specifying aggregations. You can aggregate any field (or fields) and build
>> up an output schema based on these aggregations. For example:
>>
>> PCollection<KV<Row, Row>> aggregated = allEvents.apply(
>>
>>    Group.byFieldNames(“userId”, “countryCode”)
>>
>>        .aggregateField("cost", Sum.ofLongs(), "total_cost")
>>
>>        .aggregateField("cost", Top.<Long>largestFn(10), “top_purchases”)
>>
>>        .aggregateField("transationDuration",
>> ApproximateQuantilesCombineFn.create(21),
>>
>>              “durationHistogram”)));
>>
>> This will individually aggregate the specified fields of the input items
>> (by user and country), and generate an output schema for these
>> aggregations. In this case, the output schema will be the following:
>>
>> AggregatedSchema:
>>
>>    total_cost: INT64
>>
>>    top_purchases: ARRAY[INT64]
>>
>>    durationHistogram: ARRAY[DOUBLE]
>>
>> There are some more utility transforms I've written that are worth
>> looking at such as Convert (which can convert between user types that share
>> a schema) and Unnest (flattens nested schemas). There are also some others
>> such as Pivot that we should consider writing
>>
>> There is still a lot to do. All the todo items are reflected in JIRA,
>> however here are some examples of current gaps:
>>
>>
>>    -
>>
>>    Support for read-only POJOs (those with final fields) and JavaBean
>>    (objects without setters).
>>    -
>>
>>    Automatic schema inference from more Java types: protocol buffers,
>>    avro, AutoValue, etc.
>>    -
>>
>>    Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>>    -
>>
>>    Support for JsonPath expressions so users can better express nested
>>    fields. E.g. support expressions of the form Select.fields(“field1.field2”,
>>    “field3.*”, “field4[0].field5”);
>>    -
>>
>>    Schemas still need to be defined in our portability layer so they can
>>    be used cross language.
>>
>>
>> If anyone is interested in helping close these gaps, you'll be helping
>> make Beam a better, more-usable system!
>>
>> Reuven
>>
>>

Re: Beam Schemas: current status

Posted by Eugene Kirpichov <ki...@google.com>.
Wow, this is really coming together, congratulations and thanks for the
great work!

On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax <re...@google.com> wrote:

> I wanted to send a quick note to the community about the current status of
> schema-aware PCollections in Beam. As some might remember we had a good
> discussion last year about the design of these schemas, involving many
> folks from different parts of the community. I sent a summary earlier this
> year explaining how schemas has been integrated into the DoFn framework.
> Much has happened since then, and here are some of the highlights.
>
> First, I want to emphasize that all the schema-aware classes are currently
> marked @Experimental. Nothing is set in stone yet, so if you have questions
> about any decisions made, please start a discussion!
>
> SQL
>
> The first big milestone for schemas was porting all of BeamSQL to use the
> framework, which was done in pr/5956. This was a lot of work, exposed many
> bugs in the schema implementation, but now provides great evidence that
> schemas work!
>
> Schema inference
>
> Beam can automatically infer schemas from Java POJOs (objects with public
> fields) or JavaBean objects (objects with getter/setter methods). Often you
> can do this by simply annotating the class. For example:
>
> @DefaultSchema(JavaFieldSchema.class)
>
> public class UserEvent {
>
>  public String userId;
>
>  public LatLong location;
>
>  Public String countryCode;
>
>  public long transactionCost;
>
>  public double transactionDuration;
>
>  public List<String> traceMessages;
>
> };
>
> @DefaultSchema(JavaFieldSchema.class)
>
> public class LatLong {
>
>  public double latitude;
>
>  public double longitude;
>
> }
>
> Beam will automatically infer schemas for these classes! So if you have a
> PCollection<UserEvent>, it will automatically get the following schema:
>
> UserEvent:
>
>  userId: STRING
>
>  location: ROW(LatLong)
>
>  countryCode: STRING
>
>  transactionCost: INT64
>
>  transactionDuration: DOUBLE
>
>  traceMessages: ARRAY[STRING]]
>
>
> LatLong:
>
>  latitude: DOUBLE
>
>  longitude: DOUBLE
>
> Now it’s not always possible to annotate the class like this (you may not
> own the class definition), so you can also explicitly register this using
> Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
>
> Coders
>
> Beam has a built-in coder for any schema-aware PCollection, largely
> removing the need for users to care about coders. We generate low-level
> bytecode (using ByteBuddy) to implement the coder for each schema, so these
> coders are quite performant. This provides a better default coder for Java
> POJO objects as well. In the past users were recommended to use AvroCoder
> for pojos, which many have found inefficient. Now there’s a more-efficient
> solution.
>
> Utility Transforms
>
> Schemas are already useful for implementers of extensions such as SQL, but
> the goal was to use them to make Beam itself easier to use. To this end,
> I’ve been implementing a library of transforms that allow for easy
> manipulation of schema PCollections. So far Filter and Select are merged,
> Group is about to go out for review (it needs some more javadoc and unit
> tests), and Join is being developed but doesn’t yet have a final interface.
>
> Filter
>
> Given a PCollection<LatLong>, I want to keep only those in an area of
> southern manhattan. Well this is easy!
>
> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter
>
>  .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
>
>  .whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
>
> Schemas along with lambdas allows us to write this transform
> declaratively. The Filter transform also allows you to register filter
> functions that operate on multiple fields at the same time.
>
> Select
>
> Let’s say that I don’t need all the fields in a row. For instance, I’m
> only interested in the userId and traceMessages, and don’t care about the
> location. In that case I can write the following:
>
> PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”, “
> traceMessages”));
>
>
> BTW, Beam also keeps track of which fields are accessed by a transform In
> the future we can automatically insert Selects in front of subgraphs to
> drop fields that are not referenced in that subgraph.
>
> Group
>
> Group is one of the more advanced transforms. In its most basic form, it
> provides a convenient way to group by key:
>
> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =
>
>    allEvents.apply(Group.byFieldNames(“userId”, “countryCode”));
>
> Notice how much more concise this is than using GroupByKey directly!
>
> The Group transform really starts to shine however when you start
> specifying aggregations. You can aggregate any field (or fields) and build
> up an output schema based on these aggregations. For example:
>
> PCollection<KV<Row, Row>> aggregated = allEvents.apply(
>
>    Group.byFieldNames(“userId”, “countryCode”)
>
>        .aggregateField("cost", Sum.ofLongs(), "total_cost")
>
>        .aggregateField("cost", Top.<Long>largestFn(10), “top_purchases”)
>
>        .aggregateField("transationDuration", ApproximateQuantilesCombineFn
> .create(21),
>
>              “durationHistogram”)));
>
> This will individually aggregate the specified fields of the input items
> (by user and country), and generate an output schema for these
> aggregations. In this case, the output schema will be the following:
>
> AggregatedSchema:
>
>    total_cost: INT64
>
>    top_purchases: ARRAY[INT64]
>
>    durationHistogram: ARRAY[DOUBLE]
>
> There are some more utility transforms I've written that are worth looking
> at such as Convert (which can convert between user types that share a
> schema) and Unnest (flattens nested schemas). There are also some others
> such as Pivot that we should consider writing
>
> There is still a lot to do. All the todo items are reflected in JIRA,
> however here are some examples of current gaps:
>
>
>    -
>
>    Support for read-only POJOs (those with final fields) and JavaBean
>    (objects without setters).
>    -
>
>    Automatic schema inference from more Java types: protocol buffers,
>    avro, AutoValue, etc.
>    -
>
>    Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>    -
>
>    Support for JsonPath expressions so users can better express nested
>    fields. E.g. support expressions of the form Select.fields(“field1.field2”,
>    “field3.*”, “field4[0].field5”);
>    -
>
>    Schemas still need to be defined in our portability layer so they can
>    be used cross language.
>
>
> If anyone is interested in helping close these gaps, you'll be helping
> make Beam a better, more-usable system!
>
> Reuven
>
>

Re: Beam Schemas: current status

Posted by Maximilian Michels <mx...@apache.org>.
Good point with identical types. You definitely want to avoid the following:

class Pojo {
   final String param1;
   final String param2;

   Pojo(String param2, String param1) {
     this.param1 = param1;
     this.param2 = param2;
   }
}

This would change the Pojo after deserialization. So this should only do 
its magic if there is only one possible way to feed data to the 
constructor. That's why a dedicated interface would be the easier and 
safer way to opt-in.

On 31.08.18 11:27, Robert Bradshaw wrote:
> On Fri, Aug 31, 2018 at 11:22 AM Maximilian Michels <mxm@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Thanks Reuven. That's an OK restriction. Apache Flink also requires
>     non-final fields to be able to generate TypeInformation (~=Schema) from
>     PoJos.
> 
>     I agree that it's not very intuitive for Users.
> 
>     I suppose it would work to assume a constructor with the same parameter
>     order as the fields in the class. So if instantiation with the default
>     constructor doesn't work, it would try to look up a constructor
>     based on
>     the fields of the class.
> 
> 
> I think this would make a lot of sense, but it would require some 
> assumptions (e.g. the declared field order is the same as the 
> constructor argument order (and/or the schema order), especially if 
> there are fields of the same type). Probably still worth doing, either 
> under a more limited set of constraints (all fields are of a different 
> type), or as opt-in.
> 
>     Perhaps too much magic, having a dedicated interface for
>     construction is
>     a more programmatic approach.
> 
>     -Max
> 
>     On 30.08.18 16:55, Reuven Lax wrote:
>      > Max,
>      >
>      > Nested Pojos are fully supported, as are nested array/collection
>     and map
>      > types (e.g. if your Pojo contains List<OtherPojo>).
>      >
>      > One limitation right now is that only mutable Pojos are
>     supported. For
>      > example, the following Pojo would _not_ work, because the fields
>     aren't
>      > mutable.
>      >
>      > public class Pojo {
>      >    public final String field;
>      > }
>      >
>      > This is an annoying restriction, because in practice Pojo types
>     often
>      > have final fields. The reason for the restriction is that the most
>      > general way to create an instance of this Pojo (after decoding)
>     is to
>      > instantiate the object and then set the fields one by one (I also
>     assume
>      > that there's a default constructor).  I can remove this
>     restriction if
>      > there is an appropriate constructor or builder interface that
>     lets us
>      > construct the object directly.
>      >
>      > Reuven
>      >
>      > On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels
>     <mxm@apache.org <ma...@apache.org>
>      > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>      >
>      >     That's a cool feature. Are there any limitations for the schema
>      >     inference apart from being a Pojo/Bean? Does it supported
>     nested PoJos,
>      >     e.g. "wrapper.field"?
>      >
>      >     -Max
>      >
>      >     On 29.08.18 07:40, Reuven Lax wrote:
>      >      > I wanted to send a quick note to the community about the
>     current
>      >     status
>      >      > of schema-aware PCollections in Beam. As some might
>     remember we
>      >     had a
>      >      > good discussion last year about the design of these schemas,
>      >     involving
>      >      > many folks from different parts of the community. I sent a
>     summary
>      >      > earlier this year explaining how schemas has been
>     integrated into
>      >     the
>      >      > DoFn framework. Much has happened since then, and here are
>     some
>      >     of the
>      >      > highlights.
>      >      >
>      >      >
>      >      > First, I want to emphasize that all the schema-aware
>     classes are
>      >      > currently marked @Experimental. Nothing is set in stone
>     yet, so
>      >     if you
>      >      > have questions about any decisions made, please start a
>     discussion!
>      >      >
>      >      >
>      >      >       SQL
>      >      >
>      >      > The first big milestone for schemas was porting all of
>     BeamSQL to
>      >     use
>      >      > the framework, which was done in pr/5956. This was a lot
>     of work,
>      >      > exposed many bugs in the schema implementation, but now
>     provides
>      >     great
>      >      > evidence that schemas work!
>      >      >
>      >      >
>      >      >       Schema inference
>      >      >
>      >      > Beam can automatically infer schemas from Java POJOs
>     (objects with
>      >      > public fields) or JavaBean objects (objects with getter/setter
>      >     methods).
>      >      > Often you can do this by simply annotating the class. For
>     example:
>      >      >
>      >      >
>      >      > @DefaultSchema(JavaFieldSchema.class)
>      >      >
>      >      > publicclassUserEvent{
>      >      >
>      >      > publicStringuserId;
>      >      >
>      >      > publicLatLonglocation;
>      >      >
>      >      > PublicStringcountryCode;
>      >      >
>      >      > publiclongtransactionCost;
>      >      >
>      >      > publicdoubletransactionDuration;
>      >      >
>      >      > publicList<String>traceMessages;
>      >      >
>      >      > };
>      >      >
>      >      >
>      >      > @DefaultSchema(JavaFieldSchema.class)
>      >      >
>      >      > publicclassLatLong{
>      >      >
>      >      > publicdoublelatitude;
>      >      >
>      >      > publicdoublelongitude;
>      >      >
>      >      > }
>      >      >
>      >      >
>      >      > Beam will automatically infer schemas for these classes! So if
>      >     you have
>      >      > a PCollection<UserEvent>, it will automatically get the
>     following
>      >     schema:
>      >      >
>      >      >
>      >      > UserEvent:
>      >      >
>      >      >   userId: STRING
>      >      >
>      >      >   location: ROW(LatLong)
>      >      >
>      >      >   countryCode: STRING
>      >      >
>      >      >   transactionCost: INT64
>      >      >
>      >      >   transactionDuration: DOUBLE
>      >      >
>      >      >   traceMessages: ARRAY[STRING]]
>      >      >
>      >      >
>      >      > LatLong:
>      >      >
>      >      >   latitude: DOUBLE
>      >      >
>      >      >   longitude: DOUBLE
>      >      >
>      >      >
>      >      > Now it’s not always possible to annotate the class like
>     this (you
>      >     may
>      >      > not own the class definition), so you can also explicitly
>      >     register this
>      >      > using Pipeline:getSchemaRegistry:registerPOJO, and the
>     same for
>      >     JavaBeans.
>      >      >
>      >      >
>      >      >       Coders
>      >      >
>      >      > Beam has a built-in coder for any schema-aware
>     PCollection, largely
>      >      > removing the need for users to care about coders. We generate
>      >     low-level
>      >      > bytecode (using ByteBuddy) to implement the coder for each
>      >     schema, so
>      >      > these coders are quite performant. This provides a better
>     default
>      >     coder
>      >      > for Java POJO objects as well. In the past users were
>     recommended
>      >     to use
>      >      > AvroCoder for pojos, which many have found inefficient. Now
>      >     there’s a
>      >      > more-efficient solution.
>      >      >
>      >      >
>      >      >       Utility Transforms
>      >      >
>      >      > Schemas are already useful for implementers of extensions
>     such as
>      >     SQL,
>      >      > but the goal was to use them to make Beam itself easier to
>     use.
>      >     To this
>      >      > end, I’ve been implementing a library of transforms that allow
>      >     for easy
>      >      > manipulation of schema PCollections. So far Filter and
>     Select are
>      >      > merged, Group is about to go out for review (it needs some
>     more
>      >     javadoc
>      >      > and unit tests), and Join is being developed but doesn’t
>     yet have a
>      >      > final interface.
>      >      >
>      >      >
>      >      > Filter
>      >      >
>      >      > Given a PCollection<LatLong>, I want to keep only those in an
>      >     area of
>      >      > southern manhattan. Well this is easy!
>      >      >
>      >      >
>      >      > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
>      >      >
>      >      > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
>      >      >
>      >      >
>     .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
>      >      >
>      >      >
>      >      > Schemas along with lambdas allows us to write this transform
>      >      > declaratively. The Filter transform also allows you to
>     register
>      >     filter
>      >      > functions that operate on multiple fields at the same time.
>      >      >
>      >      >
>      >      > Select
>      >      >
>      >      > Let’s say that I don’t need all the fields in a row. For
>      >     instance, I’m
>      >      > only interested in the userId and traceMessages, and don’t
>     care
>      >     about
>      >      > the location. In that case I can write the following:
>      >      >
>      >      >
>      >      > PCollection<Row>selected
>      >      > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
>      >      >
>      >      >
>      >      > BTW, Beam also keeps track of which fields are accessed by a
>      >     transform
>      >      > In the future we can automatically insert Selects in front of
>      >     subgraphs
>      >      > to drop fields that are not referenced in that subgraph.
>      >      >
>      >      >
>      >      > Group
>      >      >
>      >      > Group is one of the more advanced transforms. In its most
>     basic
>      >     form, it
>      >      > provides a convenient way to group by key:
>      >      >
>      >      >
>      >      > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
>      >      >
>      >      > 
>         allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
>      >      >
>      >      >
>      >      > Notice how much more concise this is than using GroupByKey
>     directly!
>      >      >
>      >      >
>      >      > The Group transform really starts to shine however when
>     you start
>      >      > specifying aggregations. You can aggregate any field (or
>     fields) and
>      >      > build up an output schema based on these aggregations. For
>     example:
>      >      >
>      >      >
>      >      > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
>      >      >
>      >      > Group.byFieldNames(“userId”,“countryCode”)
>      >      >
>      >      > .aggregateField("cost",Sum.ofLongs(),"total_cost")
>      >      >
>      >      >
>     .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
>      >      >
>      >      >
>      >   
>       .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
>      >      >
>      >      >               “durationHistogram”)));
>      >      >
>      >      >
>      >      > This will individually aggregate the specified fields of the
>      >     input items
>      >      > (by user and country), and generate an output schema for these
>      >      > aggregations. In this case, the output schema will be the
>     following:
>      >      >
>      >      >
>      >      > AggregatedSchema:
>      >      >
>      >      >     total_cost: INT64
>      >      >
>      >      >     top_purchases: ARRAY[INT64]
>      >      >
>      >      >     durationHistogram: ARRAY[DOUBLE]
>      >      >
>      >      >
>      >      > There are some more utility transforms I've written that
>     are worth
>      >      > looking at such as Convert (which can convert between user
>     types
>      >     that
>      >      > share a schema) and Unnest (flattens nested schemas).
>     There are also
>      >      > some others such as Pivot that we should consider writing
>      >      >
>      >      >
>      >      > There is still a lot to do. All the todo items are
>     reflected in
>      >     JIRA,
>      >      > however here are some examples of current gaps:
>      >      >
>      >      >
>      >      >   *
>      >      >
>      >      >     Support for read-only POJOs (those with final fields) and
>      >     JavaBean
>      >      >     (objects without setters).
>      >      >
>      >      >   *
>      >      >
>      >      >     Automatic schema inference from more Java types: protocol
>      >     buffers,
>      >      >     avro, AutoValue, etc.
>      >      >
>      >      >   *
>      >      >
>      >      >     Integration with sources (BigQueryIO, JdbcIO, AvroIO,
>     etc.)
>      >      >
>      >      >   *
>      >      >
>      >      >     Support for JsonPath expressions so users can better
>     express
>      >     nested
>      >      >     fields. E.g. support expressions of the form
>      >      >     Select.fields(“field1.field2”, “field3.*”,
>     “field4[0].field5”);
>      >      >
>      >      >   *
>      >      >
>      >      >     Schemas still need to be defined in our portability
>     layer so they
>      >      >     can be used cross language.
>      >      >
>      >      >
>      >      > If anyone is interested in helping close these gaps, you'll be
>      >     helping
>      >      > make Beam a better, more-usable system!
>      >      >
>      >      > Reuven
>      >      >
>      >
> 

Re: Beam Schemas: current status

Posted by Robert Bradshaw <ro...@google.com>.
On Fri, Aug 31, 2018 at 11:22 AM Maximilian Michels <mx...@apache.org> wrote:

> Thanks Reuven. That's an OK restriction. Apache Flink also requires
> non-final fields to be able to generate TypeInformation (~=Schema) from
> PoJos.
>
> I agree that it's not very intuitive for Users.
>
> I suppose it would work to assume a constructor with the same parameter
> order as the fields in the class. So if instantiation with the default
> constructor doesn't work, it would try to look up a constructor based on
> the fields of the class.
>

I think this would make a lot of sense, but it would require some
assumptions (e.g. the declared field order is the same as the constructor
argument order (and/or the schema order), especially if there are fields of
the same type). Probably still worth doing, either under a more limited set
of constraints (all fields are of a different type), or as opt-in.


> Perhaps too much magic, having a dedicated interface for construction is
> a more programmatic approach.
>
> -Max
>
> On 30.08.18 16:55, Reuven Lax wrote:
> > Max,
> >
> > Nested Pojos are fully supported, as are nested array/collection and map
> > types (e.g. if your Pojo contains List<OtherPojo>).
> >
> > One limitation right now is that only mutable Pojos are supported. For
> > example, the following Pojo would _not_ work, because the fields aren't
> > mutable.
> >
> > public class Pojo {
> >    public final String field;
> > }
> >
> > This is an annoying restriction, because in practice Pojo types often
> > have final fields. The reason for the restriction is that the most
> > general way to create an instance of this Pojo (after decoding) is to
> > instantiate the object and then set the fields one by one (I also assume
> > that there's a default constructor).  I can remove this restriction if
> > there is an appropriate constructor or builder interface that lets us
> > construct the object directly.
> >
> > Reuven
> >
> > On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     That's a cool feature. Are there any limitations for the schema
> >     inference apart from being a Pojo/Bean? Does it supported nested
> PoJos,
> >     e.g. "wrapper.field"?
> >
> >     -Max
> >
> >     On 29.08.18 07:40, Reuven Lax wrote:
> >      > I wanted to send a quick note to the community about the current
> >     status
> >      > of schema-aware PCollections in Beam. As some might remember we
> >     had a
> >      > good discussion last year about the design of these schemas,
> >     involving
> >      > many folks from different parts of the community. I sent a summary
> >      > earlier this year explaining how schemas has been integrated into
> >     the
> >      > DoFn framework. Much has happened since then, and here are some
> >     of the
> >      > highlights.
> >      >
> >      >
> >      > First, I want to emphasize that all the schema-aware classes are
> >      > currently marked @Experimental. Nothing is set in stone yet, so
> >     if you
> >      > have questions about any decisions made, please start a
> discussion!
> >      >
> >      >
> >      >       SQL
> >      >
> >      > The first big milestone for schemas was porting all of BeamSQL to
> >     use
> >      > the framework, which was done in pr/5956. This was a lot of work,
> >      > exposed many bugs in the schema implementation, but now provides
> >     great
> >      > evidence that schemas work!
> >      >
> >      >
> >      >       Schema inference
> >      >
> >      > Beam can automatically infer schemas from Java POJOs (objects with
> >      > public fields) or JavaBean objects (objects with getter/setter
> >     methods).
> >      > Often you can do this by simply annotating the class. For example:
> >      >
> >      >
> >      > @DefaultSchema(JavaFieldSchema.class)
> >      >
> >      > publicclassUserEvent{
> >      >
> >      > publicStringuserId;
> >      >
> >      > publicLatLonglocation;
> >      >
> >      > PublicStringcountryCode;
> >      >
> >      > publiclongtransactionCost;
> >      >
> >      > publicdoubletransactionDuration;
> >      >
> >      > publicList<String>traceMessages;
> >      >
> >      > };
> >      >
> >      >
> >      > @DefaultSchema(JavaFieldSchema.class)
> >      >
> >      > publicclassLatLong{
> >      >
> >      > publicdoublelatitude;
> >      >
> >      > publicdoublelongitude;
> >      >
> >      > }
> >      >
> >      >
> >      > Beam will automatically infer schemas for these classes! So if
> >     you have
> >      > a PCollection<UserEvent>, it will automatically get the following
> >     schema:
> >      >
> >      >
> >      > UserEvent:
> >      >
> >      >   userId: STRING
> >      >
> >      >   location: ROW(LatLong)
> >      >
> >      >   countryCode: STRING
> >      >
> >      >   transactionCost: INT64
> >      >
> >      >   transactionDuration: DOUBLE
> >      >
> >      >   traceMessages: ARRAY[STRING]]
> >      >
> >      >
> >      > LatLong:
> >      >
> >      >   latitude: DOUBLE
> >      >
> >      >   longitude: DOUBLE
> >      >
> >      >
> >      > Now it’s not always possible to annotate the class like this (you
> >     may
> >      > not own the class definition), so you can also explicitly
> >     register this
> >      > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
> >     JavaBeans.
> >      >
> >      >
> >      >       Coders
> >      >
> >      > Beam has a built-in coder for any schema-aware PCollection,
> largely
> >      > removing the need for users to care about coders. We generate
> >     low-level
> >      > bytecode (using ByteBuddy) to implement the coder for each
> >     schema, so
> >      > these coders are quite performant. This provides a better default
> >     coder
> >      > for Java POJO objects as well. In the past users were recommended
> >     to use
> >      > AvroCoder for pojos, which many have found inefficient. Now
> >     there’s a
> >      > more-efficient solution.
> >      >
> >      >
> >      >       Utility Transforms
> >      >
> >      > Schemas are already useful for implementers of extensions such as
> >     SQL,
> >      > but the goal was to use them to make Beam itself easier to use.
> >     To this
> >      > end, I’ve been implementing a library of transforms that allow
> >     for easy
> >      > manipulation of schema PCollections. So far Filter and Select are
> >      > merged, Group is about to go out for review (it needs some more
> >     javadoc
> >      > and unit tests), and Join is being developed but doesn’t yet have
> a
> >      > final interface.
> >      >
> >      >
> >      > Filter
> >      >
> >      > Given a PCollection<LatLong>, I want to keep only those in an
> >     area of
> >      > southern manhattan. Well this is easy!
> >      >
> >      >
> >      > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
> >      >
> >      > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
> >      >
> >      > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
> >      >
> >      >
> >      > Schemas along with lambdas allows us to write this transform
> >      > declaratively. The Filter transform also allows you to register
> >     filter
> >      > functions that operate on multiple fields at the same time.
> >      >
> >      >
> >      > Select
> >      >
> >      > Let’s say that I don’t need all the fields in a row. For
> >     instance, I’m
> >      > only interested in the userId and traceMessages, and don’t care
> >     about
> >      > the location. In that case I can write the following:
> >      >
> >      >
> >      > PCollection<Row>selected
> >      > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
> >      >
> >      >
> >      > BTW, Beam also keeps track of which fields are accessed by a
> >     transform
> >      > In the future we can automatically insert Selects in front of
> >     subgraphs
> >      > to drop fields that are not referenced in that subgraph.
> >      >
> >      >
> >      > Group
> >      >
> >      > Group is one of the more advanced transforms. In its most basic
> >     form, it
> >      > provides a convenient way to group by key:
> >      >
> >      >
> >      > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
> >      >
> >      >     allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
> >      >
> >      >
> >      > Notice how much more concise this is than using GroupByKey
> directly!
> >      >
> >      >
> >      > The Group transform really starts to shine however when you start
> >      > specifying aggregations. You can aggregate any field (or fields)
> and
> >      > build up an output schema based on these aggregations. For
> example:
> >      >
> >      >
> >      > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
> >      >
> >      > Group.byFieldNames(“userId”,“countryCode”)
> >      >
> >      > .aggregateField("cost",Sum.ofLongs(),"total_cost")
> >      >
> >      > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
> >      >
> >      >
> >
>  .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
> >      >
> >      >               “durationHistogram”)));
> >      >
> >      >
> >      > This will individually aggregate the specified fields of the
> >     input items
> >      > (by user and country), and generate an output schema for these
> >      > aggregations. In this case, the output schema will be the
> following:
> >      >
> >      >
> >      > AggregatedSchema:
> >      >
> >      >     total_cost: INT64
> >      >
> >      >     top_purchases: ARRAY[INT64]
> >      >
> >      >     durationHistogram: ARRAY[DOUBLE]
> >      >
> >      >
> >      > There are some more utility transforms I've written that are worth
> >      > looking at such as Convert (which can convert between user types
> >     that
> >      > share a schema) and Unnest (flattens nested schemas). There are
> also
> >      > some others such as Pivot that we should consider writing
> >      >
> >      >
> >      > There is still a lot to do. All the todo items are reflected in
> >     JIRA,
> >      > however here are some examples of current gaps:
> >      >
> >      >
> >      >   *
> >      >
> >      >     Support for read-only POJOs (those with final fields) and
> >     JavaBean
> >      >     (objects without setters).
> >      >
> >      >   *
> >      >
> >      >     Automatic schema inference from more Java types: protocol
> >     buffers,
> >      >     avro, AutoValue, etc.
> >      >
> >      >   *
> >      >
> >      >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
> >      >
> >      >   *
> >      >
> >      >     Support for JsonPath expressions so users can better express
> >     nested
> >      >     fields. E.g. support expressions of the form
> >      >     Select.fields(“field1.field2”, “field3.*”,
> “field4[0].field5”);
> >      >
> >      >   *
> >      >
> >      >     Schemas still need to be defined in our portability layer so
> they
> >      >     can be used cross language.
> >      >
> >      >
> >      > If anyone is interested in helping close these gaps, you'll be
> >     helping
> >      > make Beam a better, more-usable system!
> >      >
> >      > Reuven
> >      >
> >
>

Re: Beam Schemas: current status

Posted by Robert Bradshaw <ro...@google.com>.
It's probably worth publishing this update as a blog post.

On Fri, Aug 31, 2018 at 9:58 PM Reuven Lax <re...@google.com> wrote:

> In addition, JdbcIO is another source that could integrate with schemas.
>
> Another point of integration could be with shared schema registries (such
> as Kafka Schema Registry.). Any source can integrate with an external
> registry and use it to set the schema on the output.
>
> Reuven
>
> On Fri, Aug 31, 2018 at 12:44 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Fri, Aug 31, 2018 at 5:01 PM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> Thanks Reuven for updating community with this, great work!
>>>
>>> One small question about IO integration. What kind of integration this
>>> is supposed to be?
>>>
>>
>> Two IOs that I would love to see benefit from schemas are BigQuery and
>> Avro (and really any source that already has a schema, even CSVs). This of
>> course would require querying the source and possibly some of the data at
>> pipeline construction time (which has pros and cons). Both of these
>> examples also require a schema when writing, which under this scheme could
>> be implicit rather than (re)provided by the user.
>>
>>
>>> Are there any IOs that already benefit from Schemas support?
>>>
>>
>>> On 31 Aug 2018, at 16:46, Reuven Lax <re...@google.com> wrote:
>>>
>>>
>>>
>>> On Fri, Aug 31, 2018 at 2:22 AM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>
>>>> Thanks Reuven. That's an OK restriction. Apache Flink also requires
>>>> non-final fields to be able to generate TypeInformation (~=Schema) from
>>>> PoJos.
>>>>
>>>> I agree that it's not very intuitive for Users.
>>>>
>>>> I suppose it would work to assume a constructor with the same parameter
>>>> order as the fields in the class. So if instantiation with the default
>>>> constructor doesn't work, it would try to look up a constructor based
>>>> on
>>>> the fields of the class.
>>>>
>>>
>>> Actually Java reflection doesn't guarantee any guaranteed order of
>>> fields or methods when you query them. We would have to look a constructor
>>> with the exact same parameter names as the fields. Unfortunately users
>>> sometimes shorten the parameter names when creating such constructors,
>>> which would defeat this. We could also provide a set of dedicated
>>> annotations to allow the user to mark the constructor (or static builder
>>> method) used to create the class.
>>>
>>>
>>>> Perhaps too much magic, having a dedicated interface for construction
>>>> is
>>>> a more programmatic approach.
>>>>
>>>> -Max
>>>>
>>>> On 30.08.18 16:55, Reuven Lax wrote:
>>>> > Max,
>>>> >
>>>> > Nested Pojos are fully supported, as are nested array/collection and
>>>> map
>>>> > types (e.g. if your Pojo contains List<OtherPojo>).
>>>> >
>>>> > One limitation right now is that only mutable Pojos are supported.
>>>> For
>>>> > example, the following Pojo would _not_ work, because the fields
>>>> aren't
>>>> > mutable.
>>>> >
>>>> > public class Pojo {
>>>> >    public final String field;
>>>> > }
>>>> >
>>>> > This is an annoying restriction, because in practice Pojo types often
>>>> > have final fields. The reason for the restriction is that the most
>>>> > general way to create an instance of this Pojo (after decoding) is to
>>>> > instantiate the object and then set the fields one by one (I also
>>>> assume
>>>> > that there's a default constructor).  I can remove this restriction
>>>> if
>>>> > there is an appropriate constructor or builder interface that lets us
>>>> > construct the object directly.
>>>> >
>>>> > Reuven
>>>> >
>>>> > On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <mxm@apache.org
>>>> > <ma...@apache.org>> wrote:
>>>> >
>>>> >     That's a cool feature. Are there any limitations for the schema
>>>> >     inference apart from being a Pojo/Bean? Does it supported nested
>>>> PoJos,
>>>> >     e.g. "wrapper.field"?
>>>> >
>>>> >     -Max
>>>> >
>>>> >     On 29.08.18 07:40, Reuven Lax wrote:
>>>> >      > I wanted to send a quick note to the community about the
>>>> current
>>>> >     status
>>>> >      > of schema-aware PCollections in Beam. As some might remember we
>>>> >     had a
>>>> >      > good discussion last year about the design of these schemas,
>>>> >     involving
>>>> >      > many folks from different parts of the community. I sent a
>>>> summary
>>>> >      > earlier this year explaining how schemas has been integrated
>>>> into
>>>> >     the
>>>> >      > DoFn framework. Much has happened since then, and here are some
>>>> >     of the
>>>> >      > highlights.
>>>> >      >
>>>> >      >
>>>> >      > First, I want to emphasize that all the schema-aware classes
>>>> are
>>>> >      > currently marked @Experimental. Nothing is set in stone yet, so
>>>> >     if you
>>>> >      > have questions about any decisions made, please start a
>>>> discussion!
>>>> >      >
>>>> >      >
>>>> >      >       SQL
>>>> >      >
>>>> >      > The first big milestone for schemas was porting all of BeamSQL
>>>> to
>>>> >     use
>>>> >      > the framework, which was done in pr/5956. This was a lot of
>>>> work,
>>>> >      > exposed many bugs in the schema implementation, but now
>>>> provides
>>>> >     great
>>>> >      > evidence that schemas work!
>>>> >      >
>>>> >      >
>>>> >      >       Schema inference
>>>> >      >
>>>> >      > Beam can automatically infer schemas from Java POJOs (objects
>>>> with
>>>> >      > public fields) or JavaBean objects (objects with getter/setter
>>>> >     methods).
>>>> >      > Often you can do this by simply annotating the class. For
>>>> example:
>>>> >      >
>>>> >      >
>>>> >      > @DefaultSchema(JavaFieldSchema.class)
>>>> >      >
>>>> >      > publicclassUserEvent{
>>>> >      >
>>>> >      > publicStringuserId;
>>>> >      >
>>>> >      > publicLatLonglocation;
>>>> >      >
>>>> >      > PublicStringcountryCode;
>>>> >      >
>>>> >      > publiclongtransactionCost;
>>>> >      >
>>>> >      > publicdoubletransactionDuration;
>>>> >      >
>>>> >      > publicList<String>traceMessages;
>>>> >      >
>>>> >      > };
>>>> >      >
>>>> >      >
>>>> >      > @DefaultSchema(JavaFieldSchema.class)
>>>> >      >
>>>> >      > publicclassLatLong{
>>>> >      >
>>>> >      > publicdoublelatitude;
>>>> >      >
>>>> >      > publicdoublelongitude;
>>>> >      >
>>>> >      > }
>>>> >      >
>>>> >      >
>>>> >      > Beam will automatically infer schemas for these classes! So if
>>>> >     you have
>>>> >      > a PCollection<UserEvent>, it will automatically get the
>>>> following
>>>> >     schema:
>>>> >      >
>>>> >      >
>>>> >      > UserEvent:
>>>> >      >
>>>> >      >   userId: STRING
>>>> >      >
>>>> >      >   location: ROW(LatLong)
>>>> >      >
>>>> >      >   countryCode: STRING
>>>> >      >
>>>> >      >   transactionCost: INT64
>>>> >      >
>>>> >      >   transactionDuration: DOUBLE
>>>> >      >
>>>> >      >   traceMessages: ARRAY[STRING]]
>>>> >      >
>>>> >      >
>>>> >      > LatLong:
>>>> >      >
>>>> >      >   latitude: DOUBLE
>>>> >      >
>>>> >      >   longitude: DOUBLE
>>>> >      >
>>>> >      >
>>>> >      > Now it’s not always possible to annotate the class like this
>>>> (you
>>>> >     may
>>>> >      > not own the class definition), so you can also explicitly
>>>> >     register this
>>>> >      > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
>>>> >     JavaBeans.
>>>> >      >
>>>> >      >
>>>> >      >       Coders
>>>> >      >
>>>> >      > Beam has a built-in coder for any schema-aware PCollection,
>>>> largely
>>>> >      > removing the need for users to care about coders. We generate
>>>> >     low-level
>>>> >      > bytecode (using ByteBuddy) to implement the coder for each
>>>> >     schema, so
>>>> >      > these coders are quite performant. This provides a better
>>>> default
>>>> >     coder
>>>> >      > for Java POJO objects as well. In the past users were
>>>> recommended
>>>> >     to use
>>>> >      > AvroCoder for pojos, which many have found inefficient. Now
>>>> >     there’s a
>>>> >      > more-efficient solution.
>>>> >      >
>>>> >      >
>>>> >      >       Utility Transforms
>>>> >      >
>>>> >      > Schemas are already useful for implementers of extensions such
>>>> as
>>>> >     SQL,
>>>> >      > but the goal was to use them to make Beam itself easier to use.
>>>> >     To this
>>>> >      > end, I’ve been implementing a library of transforms that allow
>>>> >     for easy
>>>> >      > manipulation of schema PCollections. So far Filter and Select
>>>> are
>>>> >      > merged, Group is about to go out for review (it needs some more
>>>> >     javadoc
>>>> >      > and unit tests), and Join is being developed but doesn’t yet
>>>> have a
>>>> >      > final interface.
>>>> >      >
>>>> >      >
>>>> >      > Filter
>>>> >      >
>>>> >      > Given a PCollection<LatLong>, I want to keep only those in an
>>>> >     area of
>>>> >      > southern manhattan. Well this is easy!
>>>> >      >
>>>> >      >
>>>> >      > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
>>>> >      >
>>>> >      > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
>>>> >      >
>>>> >      > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
>>>> >      >
>>>> >      >
>>>> >      > Schemas along with lambdas allows us to write this transform
>>>> >      > declaratively. The Filter transform also allows you to register
>>>> >     filter
>>>> >      > functions that operate on multiple fields at the same time.
>>>> >      >
>>>> >      >
>>>> >      > Select
>>>> >      >
>>>> >      > Let’s say that I don’t need all the fields in a row. For
>>>> >     instance, I’m
>>>> >      > only interested in the userId and traceMessages, and don’t care
>>>> >     about
>>>> >      > the location. In that case I can write the following:
>>>> >      >
>>>> >      >
>>>> >      > PCollection<Row>selected
>>>> >      > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
>>>> >      >
>>>> >      >
>>>> >      > BTW, Beam also keeps track of which fields are accessed by a
>>>> >     transform
>>>> >      > In the future we can automatically insert Selects in front of
>>>> >     subgraphs
>>>> >      > to drop fields that are not referenced in that subgraph.
>>>> >      >
>>>> >      >
>>>> >      > Group
>>>> >      >
>>>> >      > Group is one of the more advanced transforms. In its most basic
>>>> >     form, it
>>>> >      > provides a convenient way to group by key:
>>>> >      >
>>>> >      >
>>>> >      > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
>>>> >      >
>>>> >      >
>>>>    allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
>>>> >      >
>>>> >      >
>>>> >      > Notice how much more concise this is than using GroupByKey
>>>> directly!
>>>> >      >
>>>> >      >
>>>> >      > The Group transform really starts to shine however when you
>>>> start
>>>> >      > specifying aggregations. You can aggregate any field (or
>>>> fields) and
>>>> >      > build up an output schema based on these aggregations. For
>>>> example:
>>>> >      >
>>>> >      >
>>>> >      > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
>>>> >      >
>>>> >      > Group.byFieldNames(“userId”,“countryCode”)
>>>> >      >
>>>> >      > .aggregateField("cost",Sum.ofLongs(),"total_cost")
>>>> >      >
>>>> >      > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
>>>> >      >
>>>> >      >
>>>> >
>>>>  .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
>>>> >      >
>>>> >      >               “durationHistogram”)));
>>>> >      >
>>>> >      >
>>>> >      > This will individually aggregate the specified fields of the
>>>> >     input items
>>>> >      > (by user and country), and generate an output schema for these
>>>> >      > aggregations. In this case, the output schema will be the
>>>> following:
>>>> >      >
>>>> >      >
>>>> >      > AggregatedSchema:
>>>> >      >
>>>> >      >     total_cost: INT64
>>>> >      >
>>>> >      >     top_purchases: ARRAY[INT64]
>>>> >      >
>>>> >      >     durationHistogram: ARRAY[DOUBLE]
>>>> >      >
>>>> >      >
>>>> >      > There are some more utility transforms I've written that are
>>>> worth
>>>> >      > looking at such as Convert (which can convert between user
>>>> types
>>>> >     that
>>>> >      > share a schema) and Unnest (flattens nested schemas). There
>>>> are also
>>>> >      > some others such as Pivot that we should consider writing
>>>> >      >
>>>> >      >
>>>> >      > There is still a lot to do. All the todo items are reflected in
>>>> >     JIRA,
>>>> >      > however here are some examples of current gaps:
>>>> >      >
>>>> >      >
>>>> >      >   *
>>>> >      >
>>>> >      >     Support for read-only POJOs (those with final fields) and
>>>> >     JavaBean
>>>> >      >     (objects without setters).
>>>> >      >
>>>> >      >   *
>>>> >      >
>>>> >      >     Automatic schema inference from more Java types: protocol
>>>> >     buffers,
>>>> >      >     avro, AutoValue, etc.
>>>> >      >
>>>> >      >   *
>>>> >      >
>>>> >      >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>>>> >      >
>>>> >      >   *
>>>> >      >
>>>> >      >     Support for JsonPath expressions so users can better
>>>> express
>>>> >     nested
>>>> >      >     fields. E.g. support expressions of the form
>>>> >      >     Select.fields(“field1.field2”, “field3.*”,
>>>> “field4[0].field5”);
>>>> >      >
>>>> >      >   *
>>>> >      >
>>>> >      >     Schemas still need to be defined in our portability layer
>>>> so they
>>>> >      >     can be used cross language.
>>>> >      >
>>>> >      >
>>>> >      > If anyone is interested in helping close these gaps, you'll be
>>>> >     helping
>>>> >      > make Beam a better, more-usable system!
>>>> >      >
>>>> >      > Reuven
>>>> >      >
>>>> >
>>>>
>>>
>>>

Re: Beam Schemas: current status

Posted by Reuven Lax <re...@google.com>.
In addition, JdbcIO is another source that could integrate with schemas.

Another point of integration could be with shared schema registries (such
as Kafka Schema Registry.). Any source can integrate with an external
registry and use it to set the schema on the output.

Reuven

On Fri, Aug 31, 2018 at 12:44 PM Robert Bradshaw <ro...@google.com>
wrote:

> On Fri, Aug 31, 2018 at 5:01 PM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> Thanks Reuven for updating community with this, great work!
>>
>> One small question about IO integration. What kind of integration this is
>> supposed to be?
>>
>
> Two IOs that I would love to see benefit from schemas are BigQuery and
> Avro (and really any source that already has a schema, even CSVs). This of
> course would require querying the source and possibly some of the data at
> pipeline construction time (which has pros and cons). Both of these
> examples also require a schema when writing, which under this scheme could
> be implicit rather than (re)provided by the user.
>
>
>> Are there any IOs that already benefit from Schemas support?
>>
>
>> On 31 Aug 2018, at 16:46, Reuven Lax <re...@google.com> wrote:
>>
>>
>>
>> On Fri, Aug 31, 2018 at 2:22 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> Thanks Reuven. That's an OK restriction. Apache Flink also requires
>>> non-final fields to be able to generate TypeInformation (~=Schema) from
>>> PoJos.
>>>
>>> I agree that it's not very intuitive for Users.
>>>
>>> I suppose it would work to assume a constructor with the same parameter
>>> order as the fields in the class. So if instantiation with the default
>>> constructor doesn't work, it would try to look up a constructor based on
>>> the fields of the class.
>>>
>>
>> Actually Java reflection doesn't guarantee any guaranteed order of fields
>> or methods when you query them. We would have to look a constructor with
>> the exact same parameter names as the fields. Unfortunately users sometimes
>> shorten the parameter names when creating such constructors, which would
>> defeat this. We could also provide a set of dedicated annotations to allow
>> the user to mark the constructor (or static builder method) used to create
>> the class.
>>
>>
>>> Perhaps too much magic, having a dedicated interface for construction is
>>> a more programmatic approach.
>>>
>>> -Max
>>>
>>> On 30.08.18 16:55, Reuven Lax wrote:
>>> > Max,
>>> >
>>> > Nested Pojos are fully supported, as are nested array/collection and
>>> map
>>> > types (e.g. if your Pojo contains List<OtherPojo>).
>>> >
>>> > One limitation right now is that only mutable Pojos are supported. For
>>> > example, the following Pojo would _not_ work, because the fields
>>> aren't
>>> > mutable.
>>> >
>>> > public class Pojo {
>>> >    public final String field;
>>> > }
>>> >
>>> > This is an annoying restriction, because in practice Pojo types often
>>> > have final fields. The reason for the restriction is that the most
>>> > general way to create an instance of this Pojo (after decoding) is to
>>> > instantiate the object and then set the fields one by one (I also
>>> assume
>>> > that there's a default constructor).  I can remove this restriction if
>>> > there is an appropriate constructor or builder interface that lets us
>>> > construct the object directly.
>>> >
>>> > Reuven
>>> >
>>> > On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <mxm@apache.org
>>> > <ma...@apache.org>> wrote:
>>> >
>>> >     That's a cool feature. Are there any limitations for the schema
>>> >     inference apart from being a Pojo/Bean? Does it supported nested
>>> PoJos,
>>> >     e.g. "wrapper.field"?
>>> >
>>> >     -Max
>>> >
>>> >     On 29.08.18 07:40, Reuven Lax wrote:
>>> >      > I wanted to send a quick note to the community about the current
>>> >     status
>>> >      > of schema-aware PCollections in Beam. As some might remember we
>>> >     had a
>>> >      > good discussion last year about the design of these schemas,
>>> >     involving
>>> >      > many folks from different parts of the community. I sent a
>>> summary
>>> >      > earlier this year explaining how schemas has been integrated
>>> into
>>> >     the
>>> >      > DoFn framework. Much has happened since then, and here are some
>>> >     of the
>>> >      > highlights.
>>> >      >
>>> >      >
>>> >      > First, I want to emphasize that all the schema-aware classes are
>>> >      > currently marked @Experimental. Nothing is set in stone yet, so
>>> >     if you
>>> >      > have questions about any decisions made, please start a
>>> discussion!
>>> >      >
>>> >      >
>>> >      >       SQL
>>> >      >
>>> >      > The first big milestone for schemas was porting all of BeamSQL
>>> to
>>> >     use
>>> >      > the framework, which was done in pr/5956. This was a lot of
>>> work,
>>> >      > exposed many bugs in the schema implementation, but now provides
>>> >     great
>>> >      > evidence that schemas work!
>>> >      >
>>> >      >
>>> >      >       Schema inference
>>> >      >
>>> >      > Beam can automatically infer schemas from Java POJOs (objects
>>> with
>>> >      > public fields) or JavaBean objects (objects with getter/setter
>>> >     methods).
>>> >      > Often you can do this by simply annotating the class. For
>>> example:
>>> >      >
>>> >      >
>>> >      > @DefaultSchema(JavaFieldSchema.class)
>>> >      >
>>> >      > publicclassUserEvent{
>>> >      >
>>> >      > publicStringuserId;
>>> >      >
>>> >      > publicLatLonglocation;
>>> >      >
>>> >      > PublicStringcountryCode;
>>> >      >
>>> >      > publiclongtransactionCost;
>>> >      >
>>> >      > publicdoubletransactionDuration;
>>> >      >
>>> >      > publicList<String>traceMessages;
>>> >      >
>>> >      > };
>>> >      >
>>> >      >
>>> >      > @DefaultSchema(JavaFieldSchema.class)
>>> >      >
>>> >      > publicclassLatLong{
>>> >      >
>>> >      > publicdoublelatitude;
>>> >      >
>>> >      > publicdoublelongitude;
>>> >      >
>>> >      > }
>>> >      >
>>> >      >
>>> >      > Beam will automatically infer schemas for these classes! So if
>>> >     you have
>>> >      > a PCollection<UserEvent>, it will automatically get the
>>> following
>>> >     schema:
>>> >      >
>>> >      >
>>> >      > UserEvent:
>>> >      >
>>> >      >   userId: STRING
>>> >      >
>>> >      >   location: ROW(LatLong)
>>> >      >
>>> >      >   countryCode: STRING
>>> >      >
>>> >      >   transactionCost: INT64
>>> >      >
>>> >      >   transactionDuration: DOUBLE
>>> >      >
>>> >      >   traceMessages: ARRAY[STRING]]
>>> >      >
>>> >      >
>>> >      > LatLong:
>>> >      >
>>> >      >   latitude: DOUBLE
>>> >      >
>>> >      >   longitude: DOUBLE
>>> >      >
>>> >      >
>>> >      > Now it’s not always possible to annotate the class like this
>>> (you
>>> >     may
>>> >      > not own the class definition), so you can also explicitly
>>> >     register this
>>> >      > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
>>> >     JavaBeans.
>>> >      >
>>> >      >
>>> >      >       Coders
>>> >      >
>>> >      > Beam has a built-in coder for any schema-aware PCollection,
>>> largely
>>> >      > removing the need for users to care about coders. We generate
>>> >     low-level
>>> >      > bytecode (using ByteBuddy) to implement the coder for each
>>> >     schema, so
>>> >      > these coders are quite performant. This provides a better
>>> default
>>> >     coder
>>> >      > for Java POJO objects as well. In the past users were
>>> recommended
>>> >     to use
>>> >      > AvroCoder for pojos, which many have found inefficient. Now
>>> >     there’s a
>>> >      > more-efficient solution.
>>> >      >
>>> >      >
>>> >      >       Utility Transforms
>>> >      >
>>> >      > Schemas are already useful for implementers of extensions such
>>> as
>>> >     SQL,
>>> >      > but the goal was to use them to make Beam itself easier to use.
>>> >     To this
>>> >      > end, I’ve been implementing a library of transforms that allow
>>> >     for easy
>>> >      > manipulation of schema PCollections. So far Filter and Select
>>> are
>>> >      > merged, Group is about to go out for review (it needs some more
>>> >     javadoc
>>> >      > and unit tests), and Join is being developed but doesn’t yet
>>> have a
>>> >      > final interface.
>>> >      >
>>> >      >
>>> >      > Filter
>>> >      >
>>> >      > Given a PCollection<LatLong>, I want to keep only those in an
>>> >     area of
>>> >      > southern manhattan. Well this is easy!
>>> >      >
>>> >      >
>>> >      > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
>>> >      >
>>> >      > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
>>> >      >
>>> >      > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
>>> >      >
>>> >      >
>>> >      > Schemas along with lambdas allows us to write this transform
>>> >      > declaratively. The Filter transform also allows you to register
>>> >     filter
>>> >      > functions that operate on multiple fields at the same time.
>>> >      >
>>> >      >
>>> >      > Select
>>> >      >
>>> >      > Let’s say that I don’t need all the fields in a row. For
>>> >     instance, I’m
>>> >      > only interested in the userId and traceMessages, and don’t care
>>> >     about
>>> >      > the location. In that case I can write the following:
>>> >      >
>>> >      >
>>> >      > PCollection<Row>selected
>>> >      > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
>>> >      >
>>> >      >
>>> >      > BTW, Beam also keeps track of which fields are accessed by a
>>> >     transform
>>> >      > In the future we can automatically insert Selects in front of
>>> >     subgraphs
>>> >      > to drop fields that are not referenced in that subgraph.
>>> >      >
>>> >      >
>>> >      > Group
>>> >      >
>>> >      > Group is one of the more advanced transforms. In its most basic
>>> >     form, it
>>> >      > provides a convenient way to group by key:
>>> >      >
>>> >      >
>>> >      > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
>>> >      >
>>> >      >     allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
>>> >      >
>>> >      >
>>> >      > Notice how much more concise this is than using GroupByKey
>>> directly!
>>> >      >
>>> >      >
>>> >      > The Group transform really starts to shine however when you
>>> start
>>> >      > specifying aggregations. You can aggregate any field (or
>>> fields) and
>>> >      > build up an output schema based on these aggregations. For
>>> example:
>>> >      >
>>> >      >
>>> >      > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
>>> >      >
>>> >      > Group.byFieldNames(“userId”,“countryCode”)
>>> >      >
>>> >      > .aggregateField("cost",Sum.ofLongs(),"total_cost")
>>> >      >
>>> >      > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
>>> >      >
>>> >      >
>>> >
>>>  .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
>>> >      >
>>> >      >               “durationHistogram”)));
>>> >      >
>>> >      >
>>> >      > This will individually aggregate the specified fields of the
>>> >     input items
>>> >      > (by user and country), and generate an output schema for these
>>> >      > aggregations. In this case, the output schema will be the
>>> following:
>>> >      >
>>> >      >
>>> >      > AggregatedSchema:
>>> >      >
>>> >      >     total_cost: INT64
>>> >      >
>>> >      >     top_purchases: ARRAY[INT64]
>>> >      >
>>> >      >     durationHistogram: ARRAY[DOUBLE]
>>> >      >
>>> >      >
>>> >      > There are some more utility transforms I've written that are
>>> worth
>>> >      > looking at such as Convert (which can convert between user types
>>> >     that
>>> >      > share a schema) and Unnest (flattens nested schemas). There are
>>> also
>>> >      > some others such as Pivot that we should consider writing
>>> >      >
>>> >      >
>>> >      > There is still a lot to do. All the todo items are reflected in
>>> >     JIRA,
>>> >      > however here are some examples of current gaps:
>>> >      >
>>> >      >
>>> >      >   *
>>> >      >
>>> >      >     Support for read-only POJOs (those with final fields) and
>>> >     JavaBean
>>> >      >     (objects without setters).
>>> >      >
>>> >      >   *
>>> >      >
>>> >      >     Automatic schema inference from more Java types: protocol
>>> >     buffers,
>>> >      >     avro, AutoValue, etc.
>>> >      >
>>> >      >   *
>>> >      >
>>> >      >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>>> >      >
>>> >      >   *
>>> >      >
>>> >      >     Support for JsonPath expressions so users can better express
>>> >     nested
>>> >      >     fields. E.g. support expressions of the form
>>> >      >     Select.fields(“field1.field2”, “field3.*”,
>>> “field4[0].field5”);
>>> >      >
>>> >      >   *
>>> >      >
>>> >      >     Schemas still need to be defined in our portability layer
>>> so they
>>> >      >     can be used cross language.
>>> >      >
>>> >      >
>>> >      > If anyone is interested in helping close these gaps, you'll be
>>> >     helping
>>> >      > make Beam a better, more-usable system!
>>> >      >
>>> >      > Reuven
>>> >      >
>>> >
>>>
>>
>>

Re: Beam Schemas: current status

Posted by Robert Bradshaw <ro...@google.com>.
On Fri, Aug 31, 2018 at 5:01 PM Alexey Romanenko <ar...@gmail.com>
wrote:

> Thanks Reuven for updating community with this, great work!
>
> One small question about IO integration. What kind of integration this is
> supposed to be?
>

Two IOs that I would love to see benefit from schemas are BigQuery and Avro
(and really any source that already has a schema, even CSVs). This of
course would require querying the source and possibly some of the data at
pipeline construction time (which has pros and cons). Both of these
examples also require a schema when writing, which under this scheme could
be implicit rather than (re)provided by the user.


> Are there any IOs that already benefit from Schemas support?
>

> On 31 Aug 2018, at 16:46, Reuven Lax <re...@google.com> wrote:
>
>
>
> On Fri, Aug 31, 2018 at 2:22 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> Thanks Reuven. That's an OK restriction. Apache Flink also requires
>> non-final fields to be able to generate TypeInformation (~=Schema) from
>> PoJos.
>>
>> I agree that it's not very intuitive for Users.
>>
>> I suppose it would work to assume a constructor with the same parameter
>> order as the fields in the class. So if instantiation with the default
>> constructor doesn't work, it would try to look up a constructor based on
>> the fields of the class.
>>
>
> Actually Java reflection doesn't guarantee any guaranteed order of fields
> or methods when you query them. We would have to look a constructor with
> the exact same parameter names as the fields. Unfortunately users sometimes
> shorten the parameter names when creating such constructors, which would
> defeat this. We could also provide a set of dedicated annotations to allow
> the user to mark the constructor (or static builder method) used to create
> the class.
>
>
>> Perhaps too much magic, having a dedicated interface for construction is
>> a more programmatic approach.
>>
>> -Max
>>
>> On 30.08.18 16:55, Reuven Lax wrote:
>> > Max,
>> >
>> > Nested Pojos are fully supported, as are nested array/collection and
>> map
>> > types (e.g. if your Pojo contains List<OtherPojo>).
>> >
>> > One limitation right now is that only mutable Pojos are supported. For
>> > example, the following Pojo would _not_ work, because the fields aren't
>> > mutable.
>> >
>> > public class Pojo {
>> >    public final String field;
>> > }
>> >
>> > This is an annoying restriction, because in practice Pojo types often
>> > have final fields. The reason for the restriction is that the most
>> > general way to create an instance of this Pojo (after decoding) is to
>> > instantiate the object and then set the fields one by one (I also
>> assume
>> > that there's a default constructor).  I can remove this restriction if
>> > there is an appropriate constructor or builder interface that lets us
>> > construct the object directly.
>> >
>> > Reuven
>> >
>> > On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <mxm@apache.org
>> > <ma...@apache.org>> wrote:
>> >
>> >     That's a cool feature. Are there any limitations for the schema
>> >     inference apart from being a Pojo/Bean? Does it supported nested
>> PoJos,
>> >     e.g. "wrapper.field"?
>> >
>> >     -Max
>> >
>> >     On 29.08.18 07:40, Reuven Lax wrote:
>> >      > I wanted to send a quick note to the community about the current
>> >     status
>> >      > of schema-aware PCollections in Beam. As some might remember we
>> >     had a
>> >      > good discussion last year about the design of these schemas,
>> >     involving
>> >      > many folks from different parts of the community. I sent a
>> summary
>> >      > earlier this year explaining how schemas has been integrated into
>> >     the
>> >      > DoFn framework. Much has happened since then, and here are some
>> >     of the
>> >      > highlights.
>> >      >
>> >      >
>> >      > First, I want to emphasize that all the schema-aware classes are
>> >      > currently marked @Experimental. Nothing is set in stone yet, so
>> >     if you
>> >      > have questions about any decisions made, please start a
>> discussion!
>> >      >
>> >      >
>> >      >       SQL
>> >      >
>> >      > The first big milestone for schemas was porting all of BeamSQL to
>> >     use
>> >      > the framework, which was done in pr/5956. This was a lot of work,
>> >      > exposed many bugs in the schema implementation, but now provides
>> >     great
>> >      > evidence that schemas work!
>> >      >
>> >      >
>> >      >       Schema inference
>> >      >
>> >      > Beam can automatically infer schemas from Java POJOs (objects
>> with
>> >      > public fields) or JavaBean objects (objects with getter/setter
>> >     methods).
>> >      > Often you can do this by simply annotating the class. For
>> example:
>> >      >
>> >      >
>> >      > @DefaultSchema(JavaFieldSchema.class)
>> >      >
>> >      > publicclassUserEvent{
>> >      >
>> >      > publicStringuserId;
>> >      >
>> >      > publicLatLonglocation;
>> >      >
>> >      > PublicStringcountryCode;
>> >      >
>> >      > publiclongtransactionCost;
>> >      >
>> >      > publicdoubletransactionDuration;
>> >      >
>> >      > publicList<String>traceMessages;
>> >      >
>> >      > };
>> >      >
>> >      >
>> >      > @DefaultSchema(JavaFieldSchema.class)
>> >      >
>> >      > publicclassLatLong{
>> >      >
>> >      > publicdoublelatitude;
>> >      >
>> >      > publicdoublelongitude;
>> >      >
>> >      > }
>> >      >
>> >      >
>> >      > Beam will automatically infer schemas for these classes! So if
>> >     you have
>> >      > a PCollection<UserEvent>, it will automatically get the following
>> >     schema:
>> >      >
>> >      >
>> >      > UserEvent:
>> >      >
>> >      >   userId: STRING
>> >      >
>> >      >   location: ROW(LatLong)
>> >      >
>> >      >   countryCode: STRING
>> >      >
>> >      >   transactionCost: INT64
>> >      >
>> >      >   transactionDuration: DOUBLE
>> >      >
>> >      >   traceMessages: ARRAY[STRING]]
>> >      >
>> >      >
>> >      > LatLong:
>> >      >
>> >      >   latitude: DOUBLE
>> >      >
>> >      >   longitude: DOUBLE
>> >      >
>> >      >
>> >      > Now it’s not always possible to annotate the class like this (you
>> >     may
>> >      > not own the class definition), so you can also explicitly
>> >     register this
>> >      > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
>> >     JavaBeans.
>> >      >
>> >      >
>> >      >       Coders
>> >      >
>> >      > Beam has a built-in coder for any schema-aware PCollection,
>> largely
>> >      > removing the need for users to care about coders. We generate
>> >     low-level
>> >      > bytecode (using ByteBuddy) to implement the coder for each
>> >     schema, so
>> >      > these coders are quite performant. This provides a better default
>> >     coder
>> >      > for Java POJO objects as well. In the past users were recommended
>> >     to use
>> >      > AvroCoder for pojos, which many have found inefficient. Now
>> >     there’s a
>> >      > more-efficient solution.
>> >      >
>> >      >
>> >      >       Utility Transforms
>> >      >
>> >      > Schemas are already useful for implementers of extensions such as
>> >     SQL,
>> >      > but the goal was to use them to make Beam itself easier to use.
>> >     To this
>> >      > end, I’ve been implementing a library of transforms that allow
>> >     for easy
>> >      > manipulation of schema PCollections. So far Filter and Select are
>> >      > merged, Group is about to go out for review (it needs some more
>> >     javadoc
>> >      > and unit tests), and Join is being developed but doesn’t yet
>> have a
>> >      > final interface.
>> >      >
>> >      >
>> >      > Filter
>> >      >
>> >      > Given a PCollection<LatLong>, I want to keep only those in an
>> >     area of
>> >      > southern manhattan. Well this is easy!
>> >      >
>> >      >
>> >      > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
>> >      >
>> >      > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
>> >      >
>> >      > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
>> >      >
>> >      >
>> >      > Schemas along with lambdas allows us to write this transform
>> >      > declaratively. The Filter transform also allows you to register
>> >     filter
>> >      > functions that operate on multiple fields at the same time.
>> >      >
>> >      >
>> >      > Select
>> >      >
>> >      > Let’s say that I don’t need all the fields in a row. For
>> >     instance, I’m
>> >      > only interested in the userId and traceMessages, and don’t care
>> >     about
>> >      > the location. In that case I can write the following:
>> >      >
>> >      >
>> >      > PCollection<Row>selected
>> >      > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
>> >      >
>> >      >
>> >      > BTW, Beam also keeps track of which fields are accessed by a
>> >     transform
>> >      > In the future we can automatically insert Selects in front of
>> >     subgraphs
>> >      > to drop fields that are not referenced in that subgraph.
>> >      >
>> >      >
>> >      > Group
>> >      >
>> >      > Group is one of the more advanced transforms. In its most basic
>> >     form, it
>> >      > provides a convenient way to group by key:
>> >      >
>> >      >
>> >      > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
>> >      >
>> >      >     allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
>> >      >
>> >      >
>> >      > Notice how much more concise this is than using GroupByKey
>> directly!
>> >      >
>> >      >
>> >      > The Group transform really starts to shine however when you start
>> >      > specifying aggregations. You can aggregate any field (or fields)
>> and
>> >      > build up an output schema based on these aggregations. For
>> example:
>> >      >
>> >      >
>> >      > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
>> >      >
>> >      > Group.byFieldNames(“userId”,“countryCode”)
>> >      >
>> >      > .aggregateField("cost",Sum.ofLongs(),"total_cost")
>> >      >
>> >      > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
>> >      >
>> >      >
>> >
>>  .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
>> >      >
>> >      >               “durationHistogram”)));
>> >      >
>> >      >
>> >      > This will individually aggregate the specified fields of the
>> >     input items
>> >      > (by user and country), and generate an output schema for these
>> >      > aggregations. In this case, the output schema will be the
>> following:
>> >      >
>> >      >
>> >      > AggregatedSchema:
>> >      >
>> >      >     total_cost: INT64
>> >      >
>> >      >     top_purchases: ARRAY[INT64]
>> >      >
>> >      >     durationHistogram: ARRAY[DOUBLE]
>> >      >
>> >      >
>> >      > There are some more utility transforms I've written that are
>> worth
>> >      > looking at such as Convert (which can convert between user types
>> >     that
>> >      > share a schema) and Unnest (flattens nested schemas). There are
>> also
>> >      > some others such as Pivot that we should consider writing
>> >      >
>> >      >
>> >      > There is still a lot to do. All the todo items are reflected in
>> >     JIRA,
>> >      > however here are some examples of current gaps:
>> >      >
>> >      >
>> >      >   *
>> >      >
>> >      >     Support for read-only POJOs (those with final fields) and
>> >     JavaBean
>> >      >     (objects without setters).
>> >      >
>> >      >   *
>> >      >
>> >      >     Automatic schema inference from more Java types: protocol
>> >     buffers,
>> >      >     avro, AutoValue, etc.
>> >      >
>> >      >   *
>> >      >
>> >      >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>> >      >
>> >      >   *
>> >      >
>> >      >     Support for JsonPath expressions so users can better express
>> >     nested
>> >      >     fields. E.g. support expressions of the form
>> >      >     Select.fields(“field1.field2”, “field3.*”,
>> “field4[0].field5”);
>> >      >
>> >      >   *
>> >      >
>> >      >     Schemas still need to be defined in our portability layer so
>> they
>> >      >     can be used cross language.
>> >      >
>> >      >
>> >      > If anyone is interested in helping close these gaps, you'll be
>> >     helping
>> >      > make Beam a better, more-usable system!
>> >      >
>> >      > Reuven
>> >      >
>> >
>>
>
>

Re: Beam Schemas: current status

Posted by Alexey Romanenko <ar...@gmail.com>.
Thanks Reuven for updating community with this, great work!

One small question about IO integration. What kind of integration this is supposed to be? Are there any IOs that already benefit from Schemas support?

> On 31 Aug 2018, at 16:46, Reuven Lax <re...@google.com> wrote:
> 
> 
> 
> On Fri, Aug 31, 2018 at 2:22 AM Maximilian Michels <mxm@apache.org <ma...@apache.org>> wrote:
> Thanks Reuven. That's an OK restriction. Apache Flink also requires 
> non-final fields to be able to generate TypeInformation (~=Schema) from 
> PoJos.
> 
> I agree that it's not very intuitive for Users.
> 
> I suppose it would work to assume a constructor with the same parameter 
> order as the fields in the class. So if instantiation with the default 
> constructor doesn't work, it would try to look up a constructor based on 
> the fields of the class.
> 
> Actually Java reflection doesn't guarantee any guaranteed order of fields or methods when you query them. We would have to look a constructor with the exact same parameter names as the fields. Unfortunately users sometimes shorten the parameter names when creating such constructors, which would defeat this. We could also provide a set of dedicated annotations to allow the user to mark the constructor (or static builder method) used to create the class. 
> 
> 
> Perhaps too much magic, having a dedicated interface for construction is 
> a more programmatic approach.
> 
> -Max
> 
> On 30.08.18 16:55, Reuven Lax wrote:
> > Max,
> > 
> > Nested Pojos are fully supported, as are nested array/collection and map 
> > types (e.g. if your Pojo contains List<OtherPojo>).
> > 
> > One limitation right now is that only mutable Pojos are supported. For 
> > example, the following Pojo would _not_ work, because the fields aren't 
> > mutable.
> > 
> > public class Pojo {
> >    public final String field;
> > }
> > 
> > This is an annoying restriction, because in practice Pojo types often 
> > have final fields. The reason for the restriction is that the most 
> > general way to create an instance of this Pojo (after decoding) is to 
> > instantiate the object and then set the fields one by one (I also assume 
> > that there's a default constructor).  I can remove this restriction if 
> > there is an appropriate constructor or builder interface that lets us 
> > construct the object directly.
> > 
> > Reuven
> > 
> > On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <mxm@apache.org <ma...@apache.org> 
> > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> > 
> >     That's a cool feature. Are there any limitations for the schema
> >     inference apart from being a Pojo/Bean? Does it supported nested PoJos,
> >     e.g. "wrapper.field"?
> > 
> >     -Max
> > 
> >     On 29.08.18 07:40, Reuven Lax wrote:
> >      > I wanted to send a quick note to the community about the current
> >     status
> >      > of schema-aware PCollections in Beam. As some might remember we
> >     had a
> >      > good discussion last year about the design of these schemas,
> >     involving
> >      > many folks from different parts of the community. I sent a summary
> >      > earlier this year explaining how schemas has been integrated into
> >     the
> >      > DoFn framework. Much has happened since then, and here are some
> >     of the
> >      > highlights.
> >      >
> >      >
> >      > First, I want to emphasize that all the schema-aware classes are
> >      > currently marked @Experimental. Nothing is set in stone yet, so
> >     if you
> >      > have questions about any decisions made, please start a discussion!
> >      >
> >      >
> >      >       SQL
> >      >
> >      > The first big milestone for schemas was porting all of BeamSQL to
> >     use
> >      > the framework, which was done in pr/5956. This was a lot of work,
> >      > exposed many bugs in the schema implementation, but now provides
> >     great
> >      > evidence that schemas work!
> >      >
> >      >
> >      >       Schema inference
> >      >
> >      > Beam can automatically infer schemas from Java POJOs (objects with
> >      > public fields) or JavaBean objects (objects with getter/setter
> >     methods).
> >      > Often you can do this by simply annotating the class. For example:
> >      >
> >      >
> >      > @DefaultSchema(JavaFieldSchema.class)
> >      >
> >      > publicclassUserEvent{
> >      >
> >      > publicStringuserId;
> >      >
> >      > publicLatLonglocation;
> >      >
> >      > PublicStringcountryCode;
> >      >
> >      > publiclongtransactionCost;
> >      >
> >      > publicdoubletransactionDuration;
> >      >
> >      > publicList<String>traceMessages;
> >      >
> >      > };
> >      >
> >      >
> >      > @DefaultSchema(JavaFieldSchema.class)
> >      >
> >      > publicclassLatLong{
> >      >
> >      > publicdoublelatitude;
> >      >
> >      > publicdoublelongitude;
> >      >
> >      > }
> >      >
> >      >
> >      > Beam will automatically infer schemas for these classes! So if
> >     you have
> >      > a PCollection<UserEvent>, it will automatically get the following
> >     schema:
> >      >
> >      >
> >      > UserEvent:
> >      >
> >      >   userId: STRING
> >      >
> >      >   location: ROW(LatLong)
> >      >
> >      >   countryCode: STRING
> >      >
> >      >   transactionCost: INT64
> >      >
> >      >   transactionDuration: DOUBLE
> >      >
> >      >   traceMessages: ARRAY[STRING]]
> >      >
> >      >
> >      > LatLong:
> >      >
> >      >   latitude: DOUBLE
> >      >
> >      >   longitude: DOUBLE
> >      >
> >      >
> >      > Now it’s not always possible to annotate the class like this (you
> >     may
> >      > not own the class definition), so you can also explicitly
> >     register this
> >      > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
> >     JavaBeans.
> >      >
> >      >
> >      >       Coders
> >      >
> >      > Beam has a built-in coder for any schema-aware PCollection, largely
> >      > removing the need for users to care about coders. We generate
> >     low-level
> >      > bytecode (using ByteBuddy) to implement the coder for each
> >     schema, so
> >      > these coders are quite performant. This provides a better default
> >     coder
> >      > for Java POJO objects as well. In the past users were recommended
> >     to use
> >      > AvroCoder for pojos, which many have found inefficient. Now
> >     there’s a
> >      > more-efficient solution.
> >      >
> >      >
> >      >       Utility Transforms
> >      >
> >      > Schemas are already useful for implementers of extensions such as
> >     SQL,
> >      > but the goal was to use them to make Beam itself easier to use.
> >     To this
> >      > end, I’ve been implementing a library of transforms that allow
> >     for easy
> >      > manipulation of schema PCollections. So far Filter and Select are
> >      > merged, Group is about to go out for review (it needs some more
> >     javadoc
> >      > and unit tests), and Join is being developed but doesn’t yet have a
> >      > final interface.
> >      >
> >      >
> >      > Filter
> >      >
> >      > Given a PCollection<LatLong>, I want to keep only those in an
> >     area of
> >      > southern manhattan. Well this is easy!
> >      >
> >      >
> >      > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
> >      >
> >      > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
> >      >
> >      > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
> >      >
> >      >
> >      > Schemas along with lambdas allows us to write this transform
> >      > declaratively. The Filter transform also allows you to register
> >     filter
> >      > functions that operate on multiple fields at the same time.
> >      >
> >      >
> >      > Select
> >      >
> >      > Let’s say that I don’t need all the fields in a row. For
> >     instance, I’m
> >      > only interested in the userId and traceMessages, and don’t care
> >     about
> >      > the location. In that case I can write the following:
> >      >
> >      >
> >      > PCollection<Row>selected
> >      > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
> >      >
> >      >
> >      > BTW, Beam also keeps track of which fields are accessed by a
> >     transform
> >      > In the future we can automatically insert Selects in front of
> >     subgraphs
> >      > to drop fields that are not referenced in that subgraph.
> >      >
> >      >
> >      > Group
> >      >
> >      > Group is one of the more advanced transforms. In its most basic
> >     form, it
> >      > provides a convenient way to group by key:
> >      >
> >      >
> >      > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
> >      >
> >      >     allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
> >      >
> >      >
> >      > Notice how much more concise this is than using GroupByKey directly!
> >      >
> >      >
> >      > The Group transform really starts to shine however when you start
> >      > specifying aggregations. You can aggregate any field (or fields) and
> >      > build up an output schema based on these aggregations. For example:
> >      >
> >      >
> >      > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
> >      >
> >      > Group.byFieldNames(“userId”,“countryCode”)
> >      >
> >      > .aggregateField("cost",Sum.ofLongs(),"total_cost")
> >      >
> >      > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
> >      >
> >      >
> >     .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
> >      >
> >      >               “durationHistogram”)));
> >      >
> >      >
> >      > This will individually aggregate the specified fields of the
> >     input items
> >      > (by user and country), and generate an output schema for these
> >      > aggregations. In this case, the output schema will be the following:
> >      >
> >      >
> >      > AggregatedSchema:
> >      >
> >      >     total_cost: INT64
> >      >
> >      >     top_purchases: ARRAY[INT64]
> >      >
> >      >     durationHistogram: ARRAY[DOUBLE]
> >      >
> >      >
> >      > There are some more utility transforms I've written that are worth
> >      > looking at such as Convert (which can convert between user types
> >     that
> >      > share a schema) and Unnest (flattens nested schemas). There are also
> >      > some others such as Pivot that we should consider writing
> >      >
> >      >
> >      > There is still a lot to do. All the todo items are reflected in
> >     JIRA,
> >      > however here are some examples of current gaps:
> >      >
> >      >
> >      >   *
> >      >
> >      >     Support for read-only POJOs (those with final fields) and
> >     JavaBean
> >      >     (objects without setters).
> >      >
> >      >   *
> >      >
> >      >     Automatic schema inference from more Java types: protocol
> >     buffers,
> >      >     avro, AutoValue, etc.
> >      >
> >      >   *
> >      >
> >      >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
> >      >
> >      >   *
> >      >
> >      >     Support for JsonPath expressions so users can better express
> >     nested
> >      >     fields. E.g. support expressions of the form
> >      >     Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”);
> >      >
> >      >   *
> >      >
> >      >     Schemas still need to be defined in our portability layer so they
> >      >     can be used cross language.
> >      >
> >      >
> >      > If anyone is interested in helping close these gaps, you'll be
> >     helping
> >      > make Beam a better, more-usable system!
> >      >
> >      > Reuven
> >      >
> > 


Re: Beam Schemas: current status

Posted by Reuven Lax <re...@google.com>.
On Fri, Aug 31, 2018 at 2:22 AM Maximilian Michels <mx...@apache.org> wrote:

> Thanks Reuven. That's an OK restriction. Apache Flink also requires
> non-final fields to be able to generate TypeInformation (~=Schema) from
> PoJos.
>
> I agree that it's not very intuitive for Users.
>
> I suppose it would work to assume a constructor with the same parameter
> order as the fields in the class. So if instantiation with the default
> constructor doesn't work, it would try to look up a constructor based on
> the fields of the class.
>

Actually Java reflection doesn't guarantee any guaranteed order of fields
or methods when you query them. We would have to look a constructor with
the exact same parameter names as the fields. Unfortunately users sometimes
shorten the parameter names when creating such constructors, which would
defeat this. We could also provide a set of dedicated annotations to allow
the user to mark the constructor (or static builder method) used to create
the class.


> Perhaps too much magic, having a dedicated interface for construction is
> a more programmatic approach.
>
> -Max
>
> On 30.08.18 16:55, Reuven Lax wrote:
> > Max,
> >
> > Nested Pojos are fully supported, as are nested array/collection and map
> > types (e.g. if your Pojo contains List<OtherPojo>).
> >
> > One limitation right now is that only mutable Pojos are supported. For
> > example, the following Pojo would _not_ work, because the fields aren't
> > mutable.
> >
> > public class Pojo {
> >    public final String field;
> > }
> >
> > This is an annoying restriction, because in practice Pojo types often
> > have final fields. The reason for the restriction is that the most
> > general way to create an instance of this Pojo (after decoding) is to
> > instantiate the object and then set the fields one by one (I also assume
> > that there's a default constructor).  I can remove this restriction if
> > there is an appropriate constructor or builder interface that lets us
> > construct the object directly.
> >
> > Reuven
> >
> > On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     That's a cool feature. Are there any limitations for the schema
> >     inference apart from being a Pojo/Bean? Does it supported nested
> PoJos,
> >     e.g. "wrapper.field"?
> >
> >     -Max
> >
> >     On 29.08.18 07:40, Reuven Lax wrote:
> >      > I wanted to send a quick note to the community about the current
> >     status
> >      > of schema-aware PCollections in Beam. As some might remember we
> >     had a
> >      > good discussion last year about the design of these schemas,
> >     involving
> >      > many folks from different parts of the community. I sent a summary
> >      > earlier this year explaining how schemas has been integrated into
> >     the
> >      > DoFn framework. Much has happened since then, and here are some
> >     of the
> >      > highlights.
> >      >
> >      >
> >      > First, I want to emphasize that all the schema-aware classes are
> >      > currently marked @Experimental. Nothing is set in stone yet, so
> >     if you
> >      > have questions about any decisions made, please start a
> discussion!
> >      >
> >      >
> >      >       SQL
> >      >
> >      > The first big milestone for schemas was porting all of BeamSQL to
> >     use
> >      > the framework, which was done in pr/5956. This was a lot of work,
> >      > exposed many bugs in the schema implementation, but now provides
> >     great
> >      > evidence that schemas work!
> >      >
> >      >
> >      >       Schema inference
> >      >
> >      > Beam can automatically infer schemas from Java POJOs (objects with
> >      > public fields) or JavaBean objects (objects with getter/setter
> >     methods).
> >      > Often you can do this by simply annotating the class. For example:
> >      >
> >      >
> >      > @DefaultSchema(JavaFieldSchema.class)
> >      >
> >      > publicclassUserEvent{
> >      >
> >      > publicStringuserId;
> >      >
> >      > publicLatLonglocation;
> >      >
> >      > PublicStringcountryCode;
> >      >
> >      > publiclongtransactionCost;
> >      >
> >      > publicdoubletransactionDuration;
> >      >
> >      > publicList<String>traceMessages;
> >      >
> >      > };
> >      >
> >      >
> >      > @DefaultSchema(JavaFieldSchema.class)
> >      >
> >      > publicclassLatLong{
> >      >
> >      > publicdoublelatitude;
> >      >
> >      > publicdoublelongitude;
> >      >
> >      > }
> >      >
> >      >
> >      > Beam will automatically infer schemas for these classes! So if
> >     you have
> >      > a PCollection<UserEvent>, it will automatically get the following
> >     schema:
> >      >
> >      >
> >      > UserEvent:
> >      >
> >      >   userId: STRING
> >      >
> >      >   location: ROW(LatLong)
> >      >
> >      >   countryCode: STRING
> >      >
> >      >   transactionCost: INT64
> >      >
> >      >   transactionDuration: DOUBLE
> >      >
> >      >   traceMessages: ARRAY[STRING]]
> >      >
> >      >
> >      > LatLong:
> >      >
> >      >   latitude: DOUBLE
> >      >
> >      >   longitude: DOUBLE
> >      >
> >      >
> >      > Now it’s not always possible to annotate the class like this (you
> >     may
> >      > not own the class definition), so you can also explicitly
> >     register this
> >      > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
> >     JavaBeans.
> >      >
> >      >
> >      >       Coders
> >      >
> >      > Beam has a built-in coder for any schema-aware PCollection,
> largely
> >      > removing the need for users to care about coders. We generate
> >     low-level
> >      > bytecode (using ByteBuddy) to implement the coder for each
> >     schema, so
> >      > these coders are quite performant. This provides a better default
> >     coder
> >      > for Java POJO objects as well. In the past users were recommended
> >     to use
> >      > AvroCoder for pojos, which many have found inefficient. Now
> >     there’s a
> >      > more-efficient solution.
> >      >
> >      >
> >      >       Utility Transforms
> >      >
> >      > Schemas are already useful for implementers of extensions such as
> >     SQL,
> >      > but the goal was to use them to make Beam itself easier to use.
> >     To this
> >      > end, I’ve been implementing a library of transforms that allow
> >     for easy
> >      > manipulation of schema PCollections. So far Filter and Select are
> >      > merged, Group is about to go out for review (it needs some more
> >     javadoc
> >      > and unit tests), and Join is being developed but doesn’t yet have
> a
> >      > final interface.
> >      >
> >      >
> >      > Filter
> >      >
> >      > Given a PCollection<LatLong>, I want to keep only those in an
> >     area of
> >      > southern manhattan. Well this is easy!
> >      >
> >      >
> >      > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
> >      >
> >      > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
> >      >
> >      > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
> >      >
> >      >
> >      > Schemas along with lambdas allows us to write this transform
> >      > declaratively. The Filter transform also allows you to register
> >     filter
> >      > functions that operate on multiple fields at the same time.
> >      >
> >      >
> >      > Select
> >      >
> >      > Let’s say that I don’t need all the fields in a row. For
> >     instance, I’m
> >      > only interested in the userId and traceMessages, and don’t care
> >     about
> >      > the location. In that case I can write the following:
> >      >
> >      >
> >      > PCollection<Row>selected
> >      > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
> >      >
> >      >
> >      > BTW, Beam also keeps track of which fields are accessed by a
> >     transform
> >      > In the future we can automatically insert Selects in front of
> >     subgraphs
> >      > to drop fields that are not referenced in that subgraph.
> >      >
> >      >
> >      > Group
> >      >
> >      > Group is one of the more advanced transforms. In its most basic
> >     form, it
> >      > provides a convenient way to group by key:
> >      >
> >      >
> >      > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
> >      >
> >      >     allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
> >      >
> >      >
> >      > Notice how much more concise this is than using GroupByKey
> directly!
> >      >
> >      >
> >      > The Group transform really starts to shine however when you start
> >      > specifying aggregations. You can aggregate any field (or fields)
> and
> >      > build up an output schema based on these aggregations. For
> example:
> >      >
> >      >
> >      > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
> >      >
> >      > Group.byFieldNames(“userId”,“countryCode”)
> >      >
> >      > .aggregateField("cost",Sum.ofLongs(),"total_cost")
> >      >
> >      > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
> >      >
> >      >
> >
>  .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
> >      >
> >      >               “durationHistogram”)));
> >      >
> >      >
> >      > This will individually aggregate the specified fields of the
> >     input items
> >      > (by user and country), and generate an output schema for these
> >      > aggregations. In this case, the output schema will be the
> following:
> >      >
> >      >
> >      > AggregatedSchema:
> >      >
> >      >     total_cost: INT64
> >      >
> >      >     top_purchases: ARRAY[INT64]
> >      >
> >      >     durationHistogram: ARRAY[DOUBLE]
> >      >
> >      >
> >      > There are some more utility transforms I've written that are worth
> >      > looking at such as Convert (which can convert between user types
> >     that
> >      > share a schema) and Unnest (flattens nested schemas). There are
> also
> >      > some others such as Pivot that we should consider writing
> >      >
> >      >
> >      > There is still a lot to do. All the todo items are reflected in
> >     JIRA,
> >      > however here are some examples of current gaps:
> >      >
> >      >
> >      >   *
> >      >
> >      >     Support for read-only POJOs (those with final fields) and
> >     JavaBean
> >      >     (objects without setters).
> >      >
> >      >   *
> >      >
> >      >     Automatic schema inference from more Java types: protocol
> >     buffers,
> >      >     avro, AutoValue, etc.
> >      >
> >      >   *
> >      >
> >      >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
> >      >
> >      >   *
> >      >
> >      >     Support for JsonPath expressions so users can better express
> >     nested
> >      >     fields. E.g. support expressions of the form
> >      >     Select.fields(“field1.field2”, “field3.*”,
> “field4[0].field5”);
> >      >
> >      >   *
> >      >
> >      >     Schemas still need to be defined in our portability layer so
> they
> >      >     can be used cross language.
> >      >
> >      >
> >      > If anyone is interested in helping close these gaps, you'll be
> >     helping
> >      > make Beam a better, more-usable system!
> >      >
> >      > Reuven
> >      >
> >
>

Re: Beam Schemas: current status

Posted by Maximilian Michels <mx...@apache.org>.
Thanks Reuven. That's an OK restriction. Apache Flink also requires 
non-final fields to be able to generate TypeInformation (~=Schema) from 
PoJos.

I agree that it's not very intuitive for Users.

I suppose it would work to assume a constructor with the same parameter 
order as the fields in the class. So if instantiation with the default 
constructor doesn't work, it would try to look up a constructor based on 
the fields of the class.

Perhaps too much magic, having a dedicated interface for construction is 
a more programmatic approach.

-Max

On 30.08.18 16:55, Reuven Lax wrote:
> Max,
> 
> Nested Pojos are fully supported, as are nested array/collection and map 
> types (e.g. if your Pojo contains List<OtherPojo>).
> 
> One limitation right now is that only mutable Pojos are supported. For 
> example, the following Pojo would _not_ work, because the fields aren't 
> mutable.
> 
> public class Pojo {
>    public final String field;
> }
> 
> This is an annoying restriction, because in practice Pojo types often 
> have final fields. The reason for the restriction is that the most 
> general way to create an instance of this Pojo (after decoding) is to 
> instantiate the object and then set the fields one by one (I also assume 
> that there's a default constructor).  I can remove this restriction if 
> there is an appropriate constructor or builder interface that lets us 
> construct the object directly.
> 
> Reuven
> 
> On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <mxm@apache.org 
> <ma...@apache.org>> wrote:
> 
>     That's a cool feature. Are there any limitations for the schema
>     inference apart from being a Pojo/Bean? Does it supported nested PoJos,
>     e.g. "wrapper.field"?
> 
>     -Max
> 
>     On 29.08.18 07:40, Reuven Lax wrote:
>      > I wanted to send a quick note to the community about the current
>     status
>      > of schema-aware PCollections in Beam. As some might remember we
>     had a
>      > good discussion last year about the design of these schemas,
>     involving
>      > many folks from different parts of the community. I sent a summary
>      > earlier this year explaining how schemas has been integrated into
>     the
>      > DoFn framework. Much has happened since then, and here are some
>     of the
>      > highlights.
>      >
>      >
>      > First, I want to emphasize that all the schema-aware classes are
>      > currently marked @Experimental. Nothing is set in stone yet, so
>     if you
>      > have questions about any decisions made, please start a discussion!
>      >
>      >
>      >       SQL
>      >
>      > The first big milestone for schemas was porting all of BeamSQL to
>     use
>      > the framework, which was done in pr/5956. This was a lot of work,
>      > exposed many bugs in the schema implementation, but now provides
>     great
>      > evidence that schemas work!
>      >
>      >
>      >       Schema inference
>      >
>      > Beam can automatically infer schemas from Java POJOs (objects with
>      > public fields) or JavaBean objects (objects with getter/setter
>     methods).
>      > Often you can do this by simply annotating the class. For example:
>      >
>      >
>      > @DefaultSchema(JavaFieldSchema.class)
>      >
>      > publicclassUserEvent{
>      >
>      > publicStringuserId;
>      >
>      > publicLatLonglocation;
>      >
>      > PublicStringcountryCode;
>      >
>      > publiclongtransactionCost;
>      >
>      > publicdoubletransactionDuration;
>      >
>      > publicList<String>traceMessages;
>      >
>      > };
>      >
>      >
>      > @DefaultSchema(JavaFieldSchema.class)
>      >
>      > publicclassLatLong{
>      >
>      > publicdoublelatitude;
>      >
>      > publicdoublelongitude;
>      >
>      > }
>      >
>      >
>      > Beam will automatically infer schemas for these classes! So if
>     you have
>      > a PCollection<UserEvent>, it will automatically get the following
>     schema:
>      >
>      >
>      > UserEvent:
>      >
>      >   userId: STRING
>      >
>      >   location: ROW(LatLong)
>      >
>      >   countryCode: STRING
>      >
>      >   transactionCost: INT64
>      >
>      >   transactionDuration: DOUBLE
>      >
>      >   traceMessages: ARRAY[STRING]]
>      >
>      >
>      > LatLong:
>      >
>      >   latitude: DOUBLE
>      >
>      >   longitude: DOUBLE
>      >
>      >
>      > Now it’s not always possible to annotate the class like this (you
>     may
>      > not own the class definition), so you can also explicitly
>     register this
>      > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
>     JavaBeans.
>      >
>      >
>      >       Coders
>      >
>      > Beam has a built-in coder for any schema-aware PCollection, largely
>      > removing the need for users to care about coders. We generate
>     low-level
>      > bytecode (using ByteBuddy) to implement the coder for each
>     schema, so
>      > these coders are quite performant. This provides a better default
>     coder
>      > for Java POJO objects as well. In the past users were recommended
>     to use
>      > AvroCoder for pojos, which many have found inefficient. Now
>     there’s a
>      > more-efficient solution.
>      >
>      >
>      >       Utility Transforms
>      >
>      > Schemas are already useful for implementers of extensions such as
>     SQL,
>      > but the goal was to use them to make Beam itself easier to use.
>     To this
>      > end, I’ve been implementing a library of transforms that allow
>     for easy
>      > manipulation of schema PCollections. So far Filter and Select are
>      > merged, Group is about to go out for review (it needs some more
>     javadoc
>      > and unit tests), and Join is being developed but doesn’t yet have a
>      > final interface.
>      >
>      >
>      > Filter
>      >
>      > Given a PCollection<LatLong>, I want to keep only those in an
>     area of
>      > southern manhattan. Well this is easy!
>      >
>      >
>      > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
>      >
>      > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
>      >
>      > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
>      >
>      >
>      > Schemas along with lambdas allows us to write this transform
>      > declaratively. The Filter transform also allows you to register
>     filter
>      > functions that operate on multiple fields at the same time.
>      >
>      >
>      > Select
>      >
>      > Let’s say that I don’t need all the fields in a row. For
>     instance, I’m
>      > only interested in the userId and traceMessages, and don’t care
>     about
>      > the location. In that case I can write the following:
>      >
>      >
>      > PCollection<Row>selected
>      > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
>      >
>      >
>      > BTW, Beam also keeps track of which fields are accessed by a
>     transform
>      > In the future we can automatically insert Selects in front of
>     subgraphs
>      > to drop fields that are not referenced in that subgraph.
>      >
>      >
>      > Group
>      >
>      > Group is one of the more advanced transforms. In its most basic
>     form, it
>      > provides a convenient way to group by key:
>      >
>      >
>      > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
>      >
>      >     allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
>      >
>      >
>      > Notice how much more concise this is than using GroupByKey directly!
>      >
>      >
>      > The Group transform really starts to shine however when you start
>      > specifying aggregations. You can aggregate any field (or fields) and
>      > build up an output schema based on these aggregations. For example:
>      >
>      >
>      > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
>      >
>      > Group.byFieldNames(“userId”,“countryCode”)
>      >
>      > .aggregateField("cost",Sum.ofLongs(),"total_cost")
>      >
>      > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
>      >
>      >
>     .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
>      >
>      >               “durationHistogram”)));
>      >
>      >
>      > This will individually aggregate the specified fields of the
>     input items
>      > (by user and country), and generate an output schema for these
>      > aggregations. In this case, the output schema will be the following:
>      >
>      >
>      > AggregatedSchema:
>      >
>      >     total_cost: INT64
>      >
>      >     top_purchases: ARRAY[INT64]
>      >
>      >     durationHistogram: ARRAY[DOUBLE]
>      >
>      >
>      > There are some more utility transforms I've written that are worth
>      > looking at such as Convert (which can convert between user types
>     that
>      > share a schema) and Unnest (flattens nested schemas). There are also
>      > some others such as Pivot that we should consider writing
>      >
>      >
>      > There is still a lot to do. All the todo items are reflected in
>     JIRA,
>      > however here are some examples of current gaps:
>      >
>      >
>      >   *
>      >
>      >     Support for read-only POJOs (those with final fields) and
>     JavaBean
>      >     (objects without setters).
>      >
>      >   *
>      >
>      >     Automatic schema inference from more Java types: protocol
>     buffers,
>      >     avro, AutoValue, etc.
>      >
>      >   *
>      >
>      >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>      >
>      >   *
>      >
>      >     Support for JsonPath expressions so users can better express
>     nested
>      >     fields. E.g. support expressions of the form
>      >     Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”);
>      >
>      >   *
>      >
>      >     Schemas still need to be defined in our portability layer so they
>      >     can be used cross language.
>      >
>      >
>      > If anyone is interested in helping close these gaps, you'll be
>     helping
>      > make Beam a better, more-usable system!
>      >
>      > Reuven
>      >
> 

Re: Beam Schemas: current status

Posted by Reuven Lax <re...@google.com>.
Max,

Nested Pojos are fully supported, as are nested array/collection and map
types (e.g. if your Pojo contains List<OtherPojo>).

One limitation right now is that only mutable Pojos are supported. For
example, the following Pojo would _not_ work, because the fields aren't
mutable.

public class Pojo {
  public final String field;
}

This is an annoying restriction, because in practice Pojo types often have
final fields. The reason for the restriction is that the most general way
to create an instance of this Pojo (after decoding) is to instantiate the
object and then set the fields one by one (I also assume that there's a
default constructor).  I can remove this restriction if there is an
appropriate constructor or builder interface that lets us construct the
object directly.

Reuven

On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <mx...@apache.org> wrote:

> That's a cool feature. Are there any limitations for the schema
> inference apart from being a Pojo/Bean? Does it supported nested PoJos,
> e.g. "wrapper.field"?
>
> -Max
>
> On 29.08.18 07:40, Reuven Lax wrote:
> > I wanted to send a quick note to the community about the current status
> > of schema-aware PCollections in Beam. As some might remember we had a
> > good discussion last year about the design of these schemas, involving
> > many folks from different parts of the community. I sent a summary
> > earlier this year explaining how schemas has been integrated into the
> > DoFn framework. Much has happened since then, and here are some of the
> > highlights.
> >
> >
> > First, I want to emphasize that all the schema-aware classes are
> > currently marked @Experimental. Nothing is set in stone yet, so if you
> > have questions about any decisions made, please start a discussion!
> >
> >
> >       SQL
> >
> > The first big milestone for schemas was porting all of BeamSQL to use
> > the framework, which was done in pr/5956. This was a lot of work,
> > exposed many bugs in the schema implementation, but now provides great
> > evidence that schemas work!
> >
> >
> >       Schema inference
> >
> > Beam can automatically infer schemas from Java POJOs (objects with
> > public fields) or JavaBean objects (objects with getter/setter methods).
> > Often you can do this by simply annotating the class. For example:
> >
> >
> > @DefaultSchema(JavaFieldSchema.class)
> >
> > publicclassUserEvent{
> >
> > publicStringuserId;
> >
> > publicLatLonglocation;
> >
> > PublicStringcountryCode;
> >
> > publiclongtransactionCost;
> >
> > publicdoubletransactionDuration;
> >
> > publicList<String>traceMessages;
> >
> > };
> >
> >
> > @DefaultSchema(JavaFieldSchema.class)
> >
> > publicclassLatLong{
> >
> > publicdoublelatitude;
> >
> > publicdoublelongitude;
> >
> > }
> >
> >
> > Beam will automatically infer schemas for these classes! So if you have
> > a PCollection<UserEvent>, it will automatically get the following schema:
> >
> >
> > UserEvent:
> >
> >   userId: STRING
> >
> >   location: ROW(LatLong)
> >
> >   countryCode: STRING
> >
> >   transactionCost: INT64
> >
> >   transactionDuration: DOUBLE
> >
> >   traceMessages: ARRAY[STRING]]
> >
> >
> > LatLong:
> >
> >   latitude: DOUBLE
> >
> >   longitude: DOUBLE
> >
> >
> > Now it’s not always possible to annotate the class like this (you may
> > not own the class definition), so you can also explicitly register this
> > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
> JavaBeans.
> >
> >
> >       Coders
> >
> > Beam has a built-in coder for any schema-aware PCollection, largely
> > removing the need for users to care about coders. We generate low-level
> > bytecode (using ByteBuddy) to implement the coder for each schema, so
> > these coders are quite performant. This provides a better default coder
> > for Java POJO objects as well. In the past users were recommended to use
> > AvroCoder for pojos, which many have found inefficient. Now there’s a
> > more-efficient solution.
> >
> >
> >       Utility Transforms
> >
> > Schemas are already useful for implementers of extensions such as SQL,
> > but the goal was to use them to make Beam itself easier to use. To this
> > end, I’ve been implementing a library of transforms that allow for easy
> > manipulation of schema PCollections. So far Filter and Select are
> > merged, Group is about to go out for review (it needs some more javadoc
> > and unit tests), and Join is being developed but doesn’t yet have a
> > final interface.
> >
> >
> > Filter
> >
> > Given a PCollection<LatLong>, I want to keep only those in an area of
> > southern manhattan. Well this is easy!
> >
> >
> > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
> >
> > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
> >
> > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
> >
> >
> > Schemas along with lambdas allows us to write this transform
> > declaratively. The Filter transform also allows you to register filter
> > functions that operate on multiple fields at the same time.
> >
> >
> > Select
> >
> > Let’s say that I don’t need all the fields in a row. For instance, I’m
> > only interested in the userId and traceMessages, and don’t care about
> > the location. In that case I can write the following:
> >
> >
> > PCollection<Row>selected
> > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
> >
> >
> > BTW, Beam also keeps track of which fields are accessed by a transform
> > In the future we can automatically insert Selects in front of subgraphs
> > to drop fields that are not referenced in that subgraph.
> >
> >
> > Group
> >
> > Group is one of the more advanced transforms. In its most basic form, it
> > provides a convenient way to group by key:
> >
> >
> > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
> >
> >     allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
> >
> >
> > Notice how much more concise this is than using GroupByKey directly!
> >
> >
> > The Group transform really starts to shine however when you start
> > specifying aggregations. You can aggregate any field (or fields) and
> > build up an output schema based on these aggregations. For example:
> >
> >
> > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
> >
> > Group.byFieldNames(“userId”,“countryCode”)
> >
> > .aggregateField("cost",Sum.ofLongs(),"total_cost")
> >
> > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
> >
> >
> .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
> >
> >               “durationHistogram”)));
> >
> >
> > This will individually aggregate the specified fields of the input items
> > (by user and country), and generate an output schema for these
> > aggregations. In this case, the output schema will be the following:
> >
> >
> > AggregatedSchema:
> >
> >     total_cost: INT64
> >
> >     top_purchases: ARRAY[INT64]
> >
> >     durationHistogram: ARRAY[DOUBLE]
> >
> >
> > There are some more utility transforms I've written that are worth
> > looking at such as Convert (which can convert between user types that
> > share a schema) and Unnest (flattens nested schemas). There are also
> > some others such as Pivot that we should consider writing
> >
> >
> > There is still a lot to do. All the todo items are reflected in JIRA,
> > however here are some examples of current gaps:
> >
> >
> >   *
> >
> >     Support for read-only POJOs (those with final fields) and JavaBean
> >     (objects without setters).
> >
> >   *
> >
> >     Automatic schema inference from more Java types: protocol buffers,
> >     avro, AutoValue, etc.
> >
> >   *
> >
> >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
> >
> >   *
> >
> >     Support for JsonPath expressions so users can better express nested
> >     fields. E.g. support expressions of the form
> >     Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”);
> >
> >   *
> >
> >     Schemas still need to be defined in our portability layer so they
> >     can be used cross language.
> >
> >
> > If anyone is interested in helping close these gaps, you'll be helping
> > make Beam a better, more-usable system!
> >
> > Reuven
> >
>

Re: Beam Schemas: current status

Posted by Maximilian Michels <mx...@apache.org>.
That's a cool feature. Are there any limitations for the schema 
inference apart from being a Pojo/Bean? Does it supported nested PoJos, 
e.g. "wrapper.field"?

-Max

On 29.08.18 07:40, Reuven Lax wrote:
> I wanted to send a quick note to the community about the current status 
> of schema-aware PCollections in Beam. As some might remember we had a 
> good discussion last year about the design of these schemas, involving 
> many folks from different parts of the community. I sent a summary 
> earlier this year explaining how schemas has been integrated into the 
> DoFn framework. Much has happened since then, and here are some of the 
> highlights.
> 
> 
> First, I want to emphasize that all the schema-aware classes are 
> currently marked @Experimental. Nothing is set in stone yet, so if you 
> have questions about any decisions made, please start a discussion!
> 
> 
>       SQL
> 
> The first big milestone for schemas was porting all of BeamSQL to use 
> the framework, which was done in pr/5956. This was a lot of work, 
> exposed many bugs in the schema implementation, but now provides great 
> evidence that schemas work!
> 
> 
>       Schema inference
> 
> Beam can automatically infer schemas from Java POJOs (objects with 
> public fields) or JavaBean objects (objects with getter/setter methods). 
> Often you can do this by simply annotating the class. For example:
> 
> 
> @DefaultSchema(JavaFieldSchema.class)
> 
> publicclassUserEvent{
> 
> publicStringuserId;
> 
> publicLatLonglocation;
> 
> PublicStringcountryCode;
> 
> publiclongtransactionCost;
> 
> publicdoubletransactionDuration;
> 
> publicList<String>traceMessages;
> 
> };
> 
> 
> @DefaultSchema(JavaFieldSchema.class)
> 
> publicclassLatLong{
> 
> publicdoublelatitude;
> 
> publicdoublelongitude;
> 
> }
> 
> 
> Beam will automatically infer schemas for these classes! So if you have 
> a PCollection<UserEvent>, it will automatically get the following schema:
> 
> 
> UserEvent:
> 
>   userId: STRING
> 
>   location: ROW(LatLong)
> 
>   countryCode: STRING
> 
>   transactionCost: INT64
> 
>   transactionDuration: DOUBLE
> 
>   traceMessages: ARRAY[STRING]]
> 
> 
> LatLong:
> 
>   latitude: DOUBLE
> 
>   longitude: DOUBLE
> 
> 
> Now it’s not always possible to annotate the class like this (you may 
> not own the class definition), so you can also explicitly register this 
> using Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
> 
> 
>       Coders
> 
> Beam has a built-in coder for any schema-aware PCollection, largely 
> removing the need for users to care about coders. We generate low-level 
> bytecode (using ByteBuddy) to implement the coder for each schema, so 
> these coders are quite performant. This provides a better default coder 
> for Java POJO objects as well. In the past users were recommended to use 
> AvroCoder for pojos, which many have found inefficient. Now there’s a 
> more-efficient solution.
> 
> 
>       Utility Transforms
> 
> Schemas are already useful for implementers of extensions such as SQL, 
> but the goal was to use them to make Beam itself easier to use. To this 
> end, I’ve been implementing a library of transforms that allow for easy 
> manipulation of schema PCollections. So far Filter and Select are 
> merged, Group is about to go out for review (it needs some more javadoc 
> and unit tests), and Join is being developed but doesn’t yet have a 
> final interface.
> 
> 
> Filter
> 
> Given a PCollection<LatLong>, I want to keep only those in an area of 
> southern manhattan. Well this is easy!
> 
> 
> PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
> 
> .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
> 
> .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
> 
> 
> Schemas along with lambdas allows us to write this transform 
> declaratively. The Filter transform also allows you to register filter 
> functions that operate on multiple fields at the same time.
> 
> 
> Select
> 
> Let’s say that I don’t need all the fields in a row. For instance, I’m 
> only interested in the userId and traceMessages, and don’t care about 
> the location. In that case I can write the following:
> 
> 
> PCollection<Row>selected 
> =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
> 
> 
> BTW, Beam also keeps track of which fields are accessed by a transform 
> In the future we can automatically insert Selects in front of subgraphs 
> to drop fields that are not referenced in that subgraph.
> 
> 
> Group
> 
> Group is one of the more advanced transforms. In its most basic form, it 
> provides a convenient way to group by key:
> 
> 
> PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
> 
>     allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
> 
> 
> Notice how much more concise this is than using GroupByKey directly!
> 
> 
> The Group transform really starts to shine however when you start 
> specifying aggregations. You can aggregate any field (or fields) and 
> build up an output schema based on these aggregations. For example:
> 
> 
> PCollection<KV<Row,Row>>aggregated =allEvents.apply(
> 
> Group.byFieldNames(“userId”,“countryCode”)
> 
> .aggregateField("cost",Sum.ofLongs(),"total_cost")
> 
> .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
> 
> .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
> 
>               “durationHistogram”)));
> 
> 
> This will individually aggregate the specified fields of the input items 
> (by user and country), and generate an output schema for these 
> aggregations. In this case, the output schema will be the following:
> 
> 
> AggregatedSchema:
> 
>     total_cost: INT64
> 
>     top_purchases: ARRAY[INT64]
> 
>     durationHistogram: ARRAY[DOUBLE]
> 
> 
> There are some more utility transforms I've written that are worth 
> looking at such as Convert (which can convert between user types that 
> share a schema) and Unnest (flattens nested schemas). There are also 
> some others such as Pivot that we should consider writing
> 
> 
> There is still a lot to do. All the todo items are reflected in JIRA, 
> however here are some examples of current gaps:
> 
> 
>   *
> 
>     Support for read-only POJOs (those with final fields) and JavaBean
>     (objects without setters).
> 
>   *
> 
>     Automatic schema inference from more Java types: protocol buffers,
>     avro, AutoValue, etc.
> 
>   *
> 
>     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
> 
>   *
> 
>     Support for JsonPath expressions so users can better express nested
>     fields. E.g. support expressions of the form
>     Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”);
> 
>   *
> 
>     Schemas still need to be defined in our portability layer so they
>     can be used cross language.
> 
> 
> If anyone is interested in helping close these gaps, you'll be helping 
> make Beam a better, more-usable system!
> 
> Reuven
> 

Re: Beam Schemas: current status

Posted by Etienne Chauchot <ec...@apache.org>.
Very impressive, thanks for your work Reuven ! 
Etienne 
Le mardi 28 août 2018 à 22:40 -0700, Reuven Lax a écrit :
> I wanted to send a quick note to the community about the current status of schema-aware PCollections in Beam. As some
> might remember we had a good discussion last year about the design of these schemas, involving many folks from
> different parts of the community. I sent a summary earlier this year explaining how schemas has been integrated into
> the DoFn framework. Much has happened since then, and here are some of the highlights.
> First, I want to emphasize that all the schema-aware classes are currently marked @Experimental. Nothing is set in
> stone yet, so if you have questions about any decisions made, please start a discussion!
> SQLThe first big milestone for schemas was porting all of BeamSQL to use the framework, which was done in pr/5956.
> This was a lot of work, exposed many bugs in the schema implementation, but now provides great evidence that schemas
> work!
> Schema inferenceBeam can automatically infer schemas from Java POJOs (objects with public fields) or JavaBean objects
> (objects with getter/setter methods). Often you can do this by simply annotating the class. For example:
> @DefaultSchema(JavaFieldSchema.class)public class UserEvent {  public String userId;  public LatLong location;  Public
> String countryCode;  public long transactionCost;  public double transactionDuration;  public List<String>
> traceMessages;};
> @DefaultSchema(JavaFieldSchema.class)public class LatLong {  public double latitude;  public double longitude;}
> Beam will automatically infer schemas for these classes! So if you have a PCollection<UserEvent>, it will
> automatically get the following schema:
> UserEvent:  userId: STRING  location: ROW(LatLong)  countryCode: STRING  transactionCost: INT64  transactionDuration:
> DOUBLE  traceMessages: ARRAY[STRING]]
> LatLong:  latitude: DOUBLE  longitude: DOUBLE
> Now it’s not always possible to annotate the class like this (you may not own the class definition), so you can also
> explicitly register this using Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
> CodersBeam has a built-in coder for any schema-aware PCollection, largely removing the need for users to care about
> coders. We generate low-level bytecode (using ByteBuddy) to implement the coder for each schema, so these coders are
> quite performant. This provides a better default coder for Java POJO objects as well. In the past users were
> recommended to use AvroCoder for pojos, which many have found inefficient. Now there’s a more-efficient solution.
> Utility TransformsSchemas are already useful for implementers of extensions such as SQL, but the goal was to use them
> to make Beam itself easier to use. To this end, I’ve been implementing a library of transforms that allow for easy
> manipulation of schema PCollections. So far Filter and Select are merged, Group is about to go out for review (it
> needs some more javadoc and unit tests), and Join is being developed but doesn’t yet have a final interface.
> FilterGiven a PCollection<LatLong>, I want to keep only those in an area of southern manhattan. Well this is easy!
> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter  .whereFieldName("latitude", lat -> lat < 40.720 && lat
> > 40.699)  .whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
> Schemas along with lambdas allows us to write this transform declaratively. The Filter transform also allows you to
> register filter functions that operate on multiple fields at the same time.
> SelectLet’s say that I don’t need all the fields in a row. For instance, I’m only interested in the userId and
> traceMessages, and don’t care about the location. In that case I can write the following:
> PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”, “traceMessages”));
> BTW, Beam also keeps track of which fields are accessed by a transform In the future we can automatically insert
> Selects in front of subgraphs to drop fields that are not referenced in that subgraph.
> GroupGroup is one of the more advanced transforms. In its most basic form, it provides a convenient way to group by
> key:
> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =       allEvents.apply(Group.byFieldNames(“userId”,
> “countryCode”));
> Notice how much more concise this is than using GroupByKey directly!
> The Group transform really starts to shine however when you start specifying aggregations. You can aggregate any field
> (or fields) and build up an output schema based on these aggregations. For example:
> PCollection<KV<Row, Row>> aggregated = allEvents.apply(    Group.byFieldNames(“userId”, “countryCode”)
>        .aggregateField("cost", Sum.ofLongs(), "total_cost")        .aggregateField("cost", Top.<Long>largestFn(10),
> “top_purchases”)        .aggregateField("transationDuration",
> ApproximateQuantilesCombineFn.create(21),               “durationHistogram”)));
> This will individually aggregate the specified fields of the input items (by user and country), and generate an output
> schema for these aggregations. In this case, the output schema will be the following:
> AggregatedSchema:    total_cost: INT64    top_purchases: ARRAY[INT64]    durationHistogram: ARRAY[DOUBLE]
> There are some more utility transforms I've written that are worth looking at such as Convert (which can convert
> between user types that share a schema) and Unnest (flattens nested schemas). There are also some others such as Pivot
> that we should consider writing
> There is still a lot to do. All the todo items are reflected in JIRA, however here are some examples of current gaps:
> Support for read-only POJOs (those with final fields) and JavaBean (objects without setters).Automatic schema
> inference from more Java types: protocol buffers, avro, AutoValue, etc.Integration with sources (BigQueryIO, JdbcIO,
> AvroIO, etc.)Support for JsonPath expressions so users can better express nested fields. E.g. support expressions of
> the form Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”);Schemas still need to be defined in our
> portability layer so they can be used cross language.
> If anyone is interested in helping close these gaps, you'll be helping make Beam a better, more-usable system!
> 
> Reuven
> 
> 

Re: Beam Schemas: current status

Posted by Connell O'Callaghan <co...@google.com>.
Nice work Reuven!!!

On Thu, Aug 30, 2018 at 6:57 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Nice feature, thanks Reuven !
>
> I started to revamp the Spark runner with dataset, I will leverage this !
>
> Regards
> JB
>
> On 29/08/2018 07:40, Reuven Lax wrote:
> > I wanted to send a quick note to the community about the current status
> > of schema-aware PCollections in Beam. As some might remember we had a
> > good discussion last year about the design of these schemas, involving
> > many folks from different parts of the community. I sent a summary
> > earlier this year explaining how schemas has been integrated into the
> > DoFn framework. Much has happened since then, and here are some of the
> > highlights.
> >
> >
> > First, I want to emphasize that all the schema-aware classes are
> > currently marked @Experimental. Nothing is set in stone yet, so if you
> > have questions about any decisions made, please start a discussion!
> >
> >
> >       SQL
> >
> > The first big milestone for schemas was porting all of BeamSQL to use
> > the framework, which was done in pr/5956. This was a lot of work,
> > exposed many bugs in the schema implementation, but now provides great
> > evidence that schemas work!
> >
> >
> >       Schema inference
> >
> > Beam can automatically infer schemas from Java POJOs (objects with
> > public fields) or JavaBean objects (objects with getter/setter methods).
> > Often you can do this by simply annotating the class. For example:
> >
> >
> > @DefaultSchema(JavaFieldSchema.class)
> >
> > publicclassUserEvent{
> >
> >  publicStringuserId;
> >
> >  publicLatLonglocation;
> >
> >  PublicStringcountryCode;
> >
> >  publiclongtransactionCost;
> >
> >  publicdoubletransactionDuration;
> >
> >  publicList<String>traceMessages;
> >
> > };
> >
> >
> > @DefaultSchema(JavaFieldSchema.class)
> >
> > publicclassLatLong{
> >
> >  publicdoublelatitude;
> >
> >  publicdoublelongitude;
> >
> > }
> >
> >
> > Beam will automatically infer schemas for these classes! So if you have
> > a PCollection<UserEvent>, it will automatically get the following schema:
> >
> >
> > UserEvent:
> >
> >  userId: STRING
> >
> >  location: ROW(LatLong)
> >
> >  countryCode: STRING
> >
> >  transactionCost: INT64
> >
> >  transactionDuration: DOUBLE
> >
> >  traceMessages: ARRAY[STRING]]
> >
> >
> > LatLong:
> >
> >  latitude: DOUBLE
> >
> >  longitude: DOUBLE
> >
> >
> > Now it’s not always possible to annotate the class like this (you may
> > not own the class definition), so you can also explicitly register this
> > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
> JavaBeans.
> >
> >
> >       Coders
> >
> > Beam has a built-in coder for any schema-aware PCollection, largely
> > removing the need for users to care about coders. We generate low-level
> > bytecode (using ByteBuddy) to implement the coder for each schema, so
> > these coders are quite performant. This provides a better default coder
> > for Java POJO objects as well. In the past users were recommended to use
> > AvroCoder for pojos, which many have found inefficient. Now there’s a
> > more-efficient solution.
> >
> >
> >       Utility Transforms
> >
> > Schemas are already useful for implementers of extensions such as SQL,
> > but the goal was to use them to make Beam itself easier to use. To this
> > end, I’ve been implementing a library of transforms that allow for easy
> > manipulation of schema PCollections. So far Filter and Select are
> > merged, Group is about to go out for review (it needs some more javadoc
> > and unit tests), and Join is being developed but doesn’t yet have a
> > final interface.
> >
> >
> > Filter
> >
> > Given a PCollection<LatLong>, I want to keep only those in an area of
> > southern manhattan. Well this is easy!
> >
> >
> > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
> >
> >  .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
> >
> >  .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
> >
> >
> > Schemas along with lambdas allows us to write this transform
> > declaratively. The Filter transform also allows you to register filter
> > functions that operate on multiple fields at the same time.
> >
> >
> > Select
> >
> > Let’s say that I don’t need all the fields in a row. For instance, I’m
> > only interested in the userId and traceMessages, and don’t care about
> > the location. In that case I can write the following:
> >
> >
> > PCollection<Row>selected
> > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
> >
> >
> > BTW, Beam also keeps track of which fields are accessed by a transform
> > In the future we can automatically insert Selects in front of subgraphs
> > to drop fields that are not referenced in that subgraph.
> >
> >
> > Group
> >
> > Group is one of the more advanced transforms. In its most basic form, it
> > provides a convenient way to group by key:
> >
> >
> > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
> >
> >    allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
> >
> >
> > Notice how much more concise this is than using GroupByKey directly!
> >
> >
> > The Group transform really starts to shine however when you start
> > specifying aggregations. You can aggregate any field (or fields) and
> > build up an output schema based on these aggregations. For example:
> >
> >
> > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
> >
> >    Group.byFieldNames(“userId”,“countryCode”)
> >
> >        .aggregateField("cost",Sum.ofLongs(),"total_cost")
> >
> >        .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
> >
> >
>        .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
> >
> >              “durationHistogram”)));
> >
> >
> > This will individually aggregate the specified fields of the input items
> > (by user and country), and generate an output schema for these
> > aggregations. In this case, the output schema will be the following:
> >
> >
> > AggregatedSchema:
> >
> >    total_cost: INT64
> >
> >    top_purchases: ARRAY[INT64]
> >
> >    durationHistogram: ARRAY[DOUBLE]
> >
> >
> > There are some more utility transforms I've written that are worth
> > looking at such as Convert (which can convert between user types that
> > share a schema) and Unnest (flattens nested schemas). There are also
> > some others such as Pivot that we should consider writing
> >
> >
> > There is still a lot to do. All the todo items are reflected in JIRA,
> > however here are some examples of current gaps:
> >
> >
> >   *
> >
> >     Support for read-only POJOs (those with final fields) and JavaBean
> >     (objects without setters).
> >
> >   *
> >
> >     Automatic schema inference from more Java types: protocol buffers,
> >     avro, AutoValue, etc.
> >
> >   *
> >
> >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
> >
> >   *
> >
> >     Support for JsonPath expressions so users can better express nested
> >     fields. E.g. support expressions of the form
> >     Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”);
> >
> >   *
> >
> >     Schemas still need to be defined in our portability layer so they
> >     can be used cross language.
> >
> >
> > If anyone is interested in helping close these gaps, you'll be helping
> > make Beam a better, more-usable system!
> >
> > Reuven
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: Beam Schemas: current status

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Nice feature, thanks Reuven !

I started to revamp the Spark runner with dataset, I will leverage this !

Regards
JB

On 29/08/2018 07:40, Reuven Lax wrote:
> I wanted to send a quick note to the community about the current status
> of schema-aware PCollections in Beam. As some might remember we had a
> good discussion last year about the design of these schemas, involving
> many folks from different parts of the community. I sent a summary
> earlier this year explaining how schemas has been integrated into the
> DoFn framework. Much has happened since then, and here are some of the
> highlights.
> 
> 
> First, I want to emphasize that all the schema-aware classes are
> currently marked @Experimental. Nothing is set in stone yet, so if you
> have questions about any decisions made, please start a discussion!
> 
> 
>       SQL
> 
> The first big milestone for schemas was porting all of BeamSQL to use
> the framework, which was done in pr/5956. This was a lot of work,
> exposed many bugs in the schema implementation, but now provides great
> evidence that schemas work!
> 
> 
>       Schema inference
> 
> Beam can automatically infer schemas from Java POJOs (objects with
> public fields) or JavaBean objects (objects with getter/setter methods).
> Often you can do this by simply annotating the class. For example:
> 
> 
> @DefaultSchema(JavaFieldSchema.class)
> 
> publicclassUserEvent{
> 
>  publicStringuserId;
> 
>  publicLatLonglocation;
> 
>  PublicStringcountryCode;
> 
>  publiclongtransactionCost;
> 
>  publicdoubletransactionDuration;
> 
>  publicList<String>traceMessages;
> 
> };
> 
> 
> @DefaultSchema(JavaFieldSchema.class)
> 
> publicclassLatLong{
> 
>  publicdoublelatitude;
> 
>  publicdoublelongitude;
> 
> }
> 
> 
> Beam will automatically infer schemas for these classes! So if you have
> a PCollection<UserEvent>, it will automatically get the following schema:
> 
> 
> UserEvent:
> 
>  userId: STRING
> 
>  location: ROW(LatLong)
> 
>  countryCode: STRING
> 
>  transactionCost: INT64
> 
>  transactionDuration: DOUBLE
> 
>  traceMessages: ARRAY[STRING]]
> 
> 
> LatLong:
> 
>  latitude: DOUBLE
> 
>  longitude: DOUBLE
> 
> 
> Now it’s not always possible to annotate the class like this (you may
> not own the class definition), so you can also explicitly register this
> using Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
> 
> 
>       Coders
> 
> Beam has a built-in coder for any schema-aware PCollection, largely
> removing the need for users to care about coders. We generate low-level
> bytecode (using ByteBuddy) to implement the coder for each schema, so
> these coders are quite performant. This provides a better default coder
> for Java POJO objects as well. In the past users were recommended to use
> AvroCoder for pojos, which many have found inefficient. Now there’s a
> more-efficient solution.
> 
> 
>       Utility Transforms
> 
> Schemas are already useful for implementers of extensions such as SQL,
> but the goal was to use them to make Beam itself easier to use. To this
> end, I’ve been implementing a library of transforms that allow for easy
> manipulation of schema PCollections. So far Filter and Select are
> merged, Group is about to go out for review (it needs some more javadoc
> and unit tests), and Join is being developed but doesn’t yet have a
> final interface.
> 
> 
> Filter
> 
> Given a PCollection<LatLong>, I want to keep only those in an area of
> southern manhattan. Well this is easy!
> 
> 
> PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
> 
>  .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
> 
>  .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
> 
> 
> Schemas along with lambdas allows us to write this transform
> declaratively. The Filter transform also allows you to register filter
> functions that operate on multiple fields at the same time.
> 
> 
> Select
> 
> Let’s say that I don’t need all the fields in a row. For instance, I’m
> only interested in the userId and traceMessages, and don’t care about
> the location. In that case I can write the following:
> 
> 
> PCollection<Row>selected
> =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
> 
> 
> BTW, Beam also keeps track of which fields are accessed by a transform
> In the future we can automatically insert Selects in front of subgraphs
> to drop fields that are not referenced in that subgraph.
> 
> 
> Group
> 
> Group is one of the more advanced transforms. In its most basic form, it
> provides a convenient way to group by key:
> 
> 
> PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =  
> 
>    allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
> 
> 
> Notice how much more concise this is than using GroupByKey directly!
> 
> 
> The Group transform really starts to shine however when you start
> specifying aggregations. You can aggregate any field (or fields) and
> build up an output schema based on these aggregations. For example:
> 
> 
> PCollection<KV<Row,Row>>aggregated =allEvents.apply(
> 
>    Group.byFieldNames(“userId”,“countryCode”)
> 
>        .aggregateField("cost",Sum.ofLongs(),"total_cost")
> 
>        .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
> 
>        .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
> 
>              “durationHistogram”)));
> 
> 
> This will individually aggregate the specified fields of the input items
> (by user and country), and generate an output schema for these
> aggregations. In this case, the output schema will be the following:
> 
> 
> AggregatedSchema:
> 
>    total_cost: INT64
> 
>    top_purchases: ARRAY[INT64]
> 
>    durationHistogram: ARRAY[DOUBLE]
> 
> 
> There are some more utility transforms I've written that are worth
> looking at such as Convert (which can convert between user types that
> share a schema) and Unnest (flattens nested schemas). There are also
> some others such as Pivot that we should consider writing
> 
> 
> There is still a lot to do. All the todo items are reflected in JIRA,
> however here are some examples of current gaps:
> 
> 
>   *
> 
>     Support for read-only POJOs (those with final fields) and JavaBean
>     (objects without setters).
> 
>   *
> 
>     Automatic schema inference from more Java types: protocol buffers,
>     avro, AutoValue, etc.
> 
>   *
> 
>     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
> 
>   *
> 
>     Support for JsonPath expressions so users can better express nested
>     fields. E.g. support expressions of the form
>     Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”);
> 
>   *
> 
>     Schemas still need to be defined in our portability layer so they
>     can be used cross language.
> 
> 
> If anyone is interested in helping close these gaps, you'll be helping
> make Beam a better, more-usable system!
> 
> Reuven
> 

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com