You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Robert Bradshaw <ro...@google.com> on 2018/09/06 13:04:20 UTC

Re: Beam Schemas: current status

It's probably worth publishing this update as a blog post.

On Fri, Aug 31, 2018 at 9:58 PM Reuven Lax <re...@google.com> wrote:

> In addition, JdbcIO is another source that could integrate with schemas.
>
> Another point of integration could be with shared schema registries (such
> as Kafka Schema Registry.). Any source can integrate with an external
> registry and use it to set the schema on the output.
>
> Reuven
>
> On Fri, Aug 31, 2018 at 12:44 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Fri, Aug 31, 2018 at 5:01 PM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> Thanks Reuven for updating community with this, great work!
>>>
>>> One small question about IO integration. What kind of integration this
>>> is supposed to be?
>>>
>>
>> Two IOs that I would love to see benefit from schemas are BigQuery and
>> Avro (and really any source that already has a schema, even CSVs). This of
>> course would require querying the source and possibly some of the data at
>> pipeline construction time (which has pros and cons). Both of these
>> examples also require a schema when writing, which under this scheme could
>> be implicit rather than (re)provided by the user.
>>
>>
>>> Are there any IOs that already benefit from Schemas support?
>>>
>>
>>> On 31 Aug 2018, at 16:46, Reuven Lax <re...@google.com> wrote:
>>>
>>>
>>>
>>> On Fri, Aug 31, 2018 at 2:22 AM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>
>>>> Thanks Reuven. That's an OK restriction. Apache Flink also requires
>>>> non-final fields to be able to generate TypeInformation (~=Schema) from
>>>> PoJos.
>>>>
>>>> I agree that it's not very intuitive for Users.
>>>>
>>>> I suppose it would work to assume a constructor with the same parameter
>>>> order as the fields in the class. So if instantiation with the default
>>>> constructor doesn't work, it would try to look up a constructor based
>>>> on
>>>> the fields of the class.
>>>>
>>>
>>> Actually Java reflection doesn't guarantee any guaranteed order of
>>> fields or methods when you query them. We would have to look a constructor
>>> with the exact same parameter names as the fields. Unfortunately users
>>> sometimes shorten the parameter names when creating such constructors,
>>> which would defeat this. We could also provide a set of dedicated
>>> annotations to allow the user to mark the constructor (or static builder
>>> method) used to create the class.
>>>
>>>
>>>> Perhaps too much magic, having a dedicated interface for construction
>>>> is
>>>> a more programmatic approach.
>>>>
>>>> -Max
>>>>
>>>> On 30.08.18 16:55, Reuven Lax wrote:
>>>> > Max,
>>>> >
>>>> > Nested Pojos are fully supported, as are nested array/collection and
>>>> map
>>>> > types (e.g. if your Pojo contains List<OtherPojo>).
>>>> >
>>>> > One limitation right now is that only mutable Pojos are supported.
>>>> For
>>>> > example, the following Pojo would _not_ work, because the fields
>>>> aren't
>>>> > mutable.
>>>> >
>>>> > public class Pojo {
>>>> >    public final String field;
>>>> > }
>>>> >
>>>> > This is an annoying restriction, because in practice Pojo types often
>>>> > have final fields. The reason for the restriction is that the most
>>>> > general way to create an instance of this Pojo (after decoding) is to
>>>> > instantiate the object and then set the fields one by one (I also
>>>> assume
>>>> > that there's a default constructor).  I can remove this restriction
>>>> if
>>>> > there is an appropriate constructor or builder interface that lets us
>>>> > construct the object directly.
>>>> >
>>>> > Reuven
>>>> >
>>>> > On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <mxm@apache.org
>>>> > <ma...@apache.org>> wrote:
>>>> >
>>>> >     That's a cool feature. Are there any limitations for the schema
>>>> >     inference apart from being a Pojo/Bean? Does it supported nested
>>>> PoJos,
>>>> >     e.g. "wrapper.field"?
>>>> >
>>>> >     -Max
>>>> >
>>>> >     On 29.08.18 07:40, Reuven Lax wrote:
>>>> >      > I wanted to send a quick note to the community about the
>>>> current
>>>> >     status
>>>> >      > of schema-aware PCollections in Beam. As some might remember we
>>>> >     had a
>>>> >      > good discussion last year about the design of these schemas,
>>>> >     involving
>>>> >      > many folks from different parts of the community. I sent a
>>>> summary
>>>> >      > earlier this year explaining how schemas has been integrated
>>>> into
>>>> >     the
>>>> >      > DoFn framework. Much has happened since then, and here are some
>>>> >     of the
>>>> >      > highlights.
>>>> >      >
>>>> >      >
>>>> >      > First, I want to emphasize that all the schema-aware classes
>>>> are
>>>> >      > currently marked @Experimental. Nothing is set in stone yet, so
>>>> >     if you
>>>> >      > have questions about any decisions made, please start a
>>>> discussion!
>>>> >      >
>>>> >      >
>>>> >      >       SQL
>>>> >      >
>>>> >      > The first big milestone for schemas was porting all of BeamSQL
>>>> to
>>>> >     use
>>>> >      > the framework, which was done in pr/5956. This was a lot of
>>>> work,
>>>> >      > exposed many bugs in the schema implementation, but now
>>>> provides
>>>> >     great
>>>> >      > evidence that schemas work!
>>>> >      >
>>>> >      >
>>>> >      >       Schema inference
>>>> >      >
>>>> >      > Beam can automatically infer schemas from Java POJOs (objects
>>>> with
>>>> >      > public fields) or JavaBean objects (objects with getter/setter
>>>> >     methods).
>>>> >      > Often you can do this by simply annotating the class. For
>>>> example:
>>>> >      >
>>>> >      >
>>>> >      > @DefaultSchema(JavaFieldSchema.class)
>>>> >      >
>>>> >      > publicclassUserEvent{
>>>> >      >
>>>> >      > publicStringuserId;
>>>> >      >
>>>> >      > publicLatLonglocation;
>>>> >      >
>>>> >      > PublicStringcountryCode;
>>>> >      >
>>>> >      > publiclongtransactionCost;
>>>> >      >
>>>> >      > publicdoubletransactionDuration;
>>>> >      >
>>>> >      > publicList<String>traceMessages;
>>>> >      >
>>>> >      > };
>>>> >      >
>>>> >      >
>>>> >      > @DefaultSchema(JavaFieldSchema.class)
>>>> >      >
>>>> >      > publicclassLatLong{
>>>> >      >
>>>> >      > publicdoublelatitude;
>>>> >      >
>>>> >      > publicdoublelongitude;
>>>> >      >
>>>> >      > }
>>>> >      >
>>>> >      >
>>>> >      > Beam will automatically infer schemas for these classes! So if
>>>> >     you have
>>>> >      > a PCollection<UserEvent>, it will automatically get the
>>>> following
>>>> >     schema:
>>>> >      >
>>>> >      >
>>>> >      > UserEvent:
>>>> >      >
>>>> >      >   userId: STRING
>>>> >      >
>>>> >      >   location: ROW(LatLong)
>>>> >      >
>>>> >      >   countryCode: STRING
>>>> >      >
>>>> >      >   transactionCost: INT64
>>>> >      >
>>>> >      >   transactionDuration: DOUBLE
>>>> >      >
>>>> >      >   traceMessages: ARRAY[STRING]]
>>>> >      >
>>>> >      >
>>>> >      > LatLong:
>>>> >      >
>>>> >      >   latitude: DOUBLE
>>>> >      >
>>>> >      >   longitude: DOUBLE
>>>> >      >
>>>> >      >
>>>> >      > Now it’s not always possible to annotate the class like this
>>>> (you
>>>> >     may
>>>> >      > not own the class definition), so you can also explicitly
>>>> >     register this
>>>> >      > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
>>>> >     JavaBeans.
>>>> >      >
>>>> >      >
>>>> >      >       Coders
>>>> >      >
>>>> >      > Beam has a built-in coder for any schema-aware PCollection,
>>>> largely
>>>> >      > removing the need for users to care about coders. We generate
>>>> >     low-level
>>>> >      > bytecode (using ByteBuddy) to implement the coder for each
>>>> >     schema, so
>>>> >      > these coders are quite performant. This provides a better
>>>> default
>>>> >     coder
>>>> >      > for Java POJO objects as well. In the past users were
>>>> recommended
>>>> >     to use
>>>> >      > AvroCoder for pojos, which many have found inefficient. Now
>>>> >     there’s a
>>>> >      > more-efficient solution.
>>>> >      >
>>>> >      >
>>>> >      >       Utility Transforms
>>>> >      >
>>>> >      > Schemas are already useful for implementers of extensions such
>>>> as
>>>> >     SQL,
>>>> >      > but the goal was to use them to make Beam itself easier to use.
>>>> >     To this
>>>> >      > end, I’ve been implementing a library of transforms that allow
>>>> >     for easy
>>>> >      > manipulation of schema PCollections. So far Filter and Select
>>>> are
>>>> >      > merged, Group is about to go out for review (it needs some more
>>>> >     javadoc
>>>> >      > and unit tests), and Join is being developed but doesn’t yet
>>>> have a
>>>> >      > final interface.
>>>> >      >
>>>> >      >
>>>> >      > Filter
>>>> >      >
>>>> >      > Given a PCollection<LatLong>, I want to keep only those in an
>>>> >     area of
>>>> >      > southern manhattan. Well this is easy!
>>>> >      >
>>>> >      >
>>>> >      > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
>>>> >      >
>>>> >      > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
>>>> >      >
>>>> >      > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
>>>> >      >
>>>> >      >
>>>> >      > Schemas along with lambdas allows us to write this transform
>>>> >      > declaratively. The Filter transform also allows you to register
>>>> >     filter
>>>> >      > functions that operate on multiple fields at the same time.
>>>> >      >
>>>> >      >
>>>> >      > Select
>>>> >      >
>>>> >      > Let’s say that I don’t need all the fields in a row. For
>>>> >     instance, I’m
>>>> >      > only interested in the userId and traceMessages, and don’t care
>>>> >     about
>>>> >      > the location. In that case I can write the following:
>>>> >      >
>>>> >      >
>>>> >      > PCollection<Row>selected
>>>> >      > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
>>>> >      >
>>>> >      >
>>>> >      > BTW, Beam also keeps track of which fields are accessed by a
>>>> >     transform
>>>> >      > In the future we can automatically insert Selects in front of
>>>> >     subgraphs
>>>> >      > to drop fields that are not referenced in that subgraph.
>>>> >      >
>>>> >      >
>>>> >      > Group
>>>> >      >
>>>> >      > Group is one of the more advanced transforms. In its most basic
>>>> >     form, it
>>>> >      > provides a convenient way to group by key:
>>>> >      >
>>>> >      >
>>>> >      > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
>>>> >      >
>>>> >      >
>>>>    allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
>>>> >      >
>>>> >      >
>>>> >      > Notice how much more concise this is than using GroupByKey
>>>> directly!
>>>> >      >
>>>> >      >
>>>> >      > The Group transform really starts to shine however when you
>>>> start
>>>> >      > specifying aggregations. You can aggregate any field (or
>>>> fields) and
>>>> >      > build up an output schema based on these aggregations. For
>>>> example:
>>>> >      >
>>>> >      >
>>>> >      > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
>>>> >      >
>>>> >      > Group.byFieldNames(“userId”,“countryCode”)
>>>> >      >
>>>> >      > .aggregateField("cost",Sum.ofLongs(),"total_cost")
>>>> >      >
>>>> >      > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
>>>> >      >
>>>> >      >
>>>> >
>>>>  .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
>>>> >      >
>>>> >      >               “durationHistogram”)));
>>>> >      >
>>>> >      >
>>>> >      > This will individually aggregate the specified fields of the
>>>> >     input items
>>>> >      > (by user and country), and generate an output schema for these
>>>> >      > aggregations. In this case, the output schema will be the
>>>> following:
>>>> >      >
>>>> >      >
>>>> >      > AggregatedSchema:
>>>> >      >
>>>> >      >     total_cost: INT64
>>>> >      >
>>>> >      >     top_purchases: ARRAY[INT64]
>>>> >      >
>>>> >      >     durationHistogram: ARRAY[DOUBLE]
>>>> >      >
>>>> >      >
>>>> >      > There are some more utility transforms I've written that are
>>>> worth
>>>> >      > looking at such as Convert (which can convert between user
>>>> types
>>>> >     that
>>>> >      > share a schema) and Unnest (flattens nested schemas). There
>>>> are also
>>>> >      > some others such as Pivot that we should consider writing
>>>> >      >
>>>> >      >
>>>> >      > There is still a lot to do. All the todo items are reflected in
>>>> >     JIRA,
>>>> >      > however here are some examples of current gaps:
>>>> >      >
>>>> >      >
>>>> >      >   *
>>>> >      >
>>>> >      >     Support for read-only POJOs (those with final fields) and
>>>> >     JavaBean
>>>> >      >     (objects without setters).
>>>> >      >
>>>> >      >   *
>>>> >      >
>>>> >      >     Automatic schema inference from more Java types: protocol
>>>> >     buffers,
>>>> >      >     avro, AutoValue, etc.
>>>> >      >
>>>> >      >   *
>>>> >      >
>>>> >      >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>>>> >      >
>>>> >      >   *
>>>> >      >
>>>> >      >     Support for JsonPath expressions so users can better
>>>> express
>>>> >     nested
>>>> >      >     fields. E.g. support expressions of the form
>>>> >      >     Select.fields(“field1.field2”, “field3.*”,
>>>> “field4[0].field5”);
>>>> >      >
>>>> >      >   *
>>>> >      >
>>>> >      >     Schemas still need to be defined in our portability layer
>>>> so they
>>>> >      >     can be used cross language.
>>>> >      >
>>>> >      >
>>>> >      > If anyone is interested in helping close these gaps, you'll be
>>>> >     helping
>>>> >      > make Beam a better, more-usable system!
>>>> >      >
>>>> >      > Reuven
>>>> >      >
>>>> >
>>>>
>>>
>>>