You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Robert Bradshaw <ro...@google.com> on 2018/09/06 13:04:20 UTC
Re: Beam Schemas: current status
It's probably worth publishing this update as a blog post.
On Fri, Aug 31, 2018 at 9:58 PM Reuven Lax <re...@google.com> wrote:
> In addition, JdbcIO is another source that could integrate with schemas.
>
> Another point of integration could be with shared schema registries (such
> as Kafka Schema Registry.). Any source can integrate with an external
> registry and use it to set the schema on the output.
>
> Reuven
>
> On Fri, Aug 31, 2018 at 12:44 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Fri, Aug 31, 2018 at 5:01 PM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> Thanks Reuven for updating community with this, great work!
>>>
>>> One small question about IO integration. What kind of integration this
>>> is supposed to be?
>>>
>>
>> Two IOs that I would love to see benefit from schemas are BigQuery and
>> Avro (and really any source that already has a schema, even CSVs). This of
>> course would require querying the source and possibly some of the data at
>> pipeline construction time (which has pros and cons). Both of these
>> examples also require a schema when writing, which under this scheme could
>> be implicit rather than (re)provided by the user.
>>
>>
>>> Are there any IOs that already benefit from Schemas support?
>>>
>>
>>> On 31 Aug 2018, at 16:46, Reuven Lax <re...@google.com> wrote:
>>>
>>>
>>>
>>> On Fri, Aug 31, 2018 at 2:22 AM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>
>>>> Thanks Reuven. That's an OK restriction. Apache Flink also requires
>>>> non-final fields to be able to generate TypeInformation (~=Schema) from
>>>> PoJos.
>>>>
>>>> I agree that it's not very intuitive for Users.
>>>>
>>>> I suppose it would work to assume a constructor with the same parameter
>>>> order as the fields in the class. So if instantiation with the default
>>>> constructor doesn't work, it would try to look up a constructor based
>>>> on
>>>> the fields of the class.
>>>>
>>>
>>> Actually Java reflection doesn't guarantee any guaranteed order of
>>> fields or methods when you query them. We would have to look a constructor
>>> with the exact same parameter names as the fields. Unfortunately users
>>> sometimes shorten the parameter names when creating such constructors,
>>> which would defeat this. We could also provide a set of dedicated
>>> annotations to allow the user to mark the constructor (or static builder
>>> method) used to create the class.
>>>
>>>
>>>> Perhaps too much magic, having a dedicated interface for construction
>>>> is
>>>> a more programmatic approach.
>>>>
>>>> -Max
>>>>
>>>> On 30.08.18 16:55, Reuven Lax wrote:
>>>> > Max,
>>>> >
>>>> > Nested Pojos are fully supported, as are nested array/collection and
>>>> map
>>>> > types (e.g. if your Pojo contains List<OtherPojo>).
>>>> >
>>>> > One limitation right now is that only mutable Pojos are supported.
>>>> For
>>>> > example, the following Pojo would _not_ work, because the fields
>>>> aren't
>>>> > mutable.
>>>> >
>>>> > public class Pojo {
>>>> > public final String field;
>>>> > }
>>>> >
>>>> > This is an annoying restriction, because in practice Pojo types often
>>>> > have final fields. The reason for the restriction is that the most
>>>> > general way to create an instance of this Pojo (after decoding) is to
>>>> > instantiate the object and then set the fields one by one (I also
>>>> assume
>>>> > that there's a default constructor). I can remove this restriction
>>>> if
>>>> > there is an appropriate constructor or builder interface that lets us
>>>> > construct the object directly.
>>>> >
>>>> > Reuven
>>>> >
>>>> > On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <mxm@apache.org
>>>> > <ma...@apache.org>> wrote:
>>>> >
>>>> > That's a cool feature. Are there any limitations for the schema
>>>> > inference apart from being a Pojo/Bean? Does it supported nested
>>>> PoJos,
>>>> > e.g. "wrapper.field"?
>>>> >
>>>> > -Max
>>>> >
>>>> > On 29.08.18 07:40, Reuven Lax wrote:
>>>> > > I wanted to send a quick note to the community about the
>>>> current
>>>> > status
>>>> > > of schema-aware PCollections in Beam. As some might remember we
>>>> > had a
>>>> > > good discussion last year about the design of these schemas,
>>>> > involving
>>>> > > many folks from different parts of the community. I sent a
>>>> summary
>>>> > > earlier this year explaining how schemas has been integrated
>>>> into
>>>> > the
>>>> > > DoFn framework. Much has happened since then, and here are some
>>>> > of the
>>>> > > highlights.
>>>> > >
>>>> > >
>>>> > > First, I want to emphasize that all the schema-aware classes
>>>> are
>>>> > > currently marked @Experimental. Nothing is set in stone yet, so
>>>> > if you
>>>> > > have questions about any decisions made, please start a
>>>> discussion!
>>>> > >
>>>> > >
>>>> > > SQL
>>>> > >
>>>> > > The first big milestone for schemas was porting all of BeamSQL
>>>> to
>>>> > use
>>>> > > the framework, which was done in pr/5956. This was a lot of
>>>> work,
>>>> > > exposed many bugs in the schema implementation, but now
>>>> provides
>>>> > great
>>>> > > evidence that schemas work!
>>>> > >
>>>> > >
>>>> > > Schema inference
>>>> > >
>>>> > > Beam can automatically infer schemas from Java POJOs (objects
>>>> with
>>>> > > public fields) or JavaBean objects (objects with getter/setter
>>>> > methods).
>>>> > > Often you can do this by simply annotating the class. For
>>>> example:
>>>> > >
>>>> > >
>>>> > > @DefaultSchema(JavaFieldSchema.class)
>>>> > >
>>>> > > publicclassUserEvent{
>>>> > >
>>>> > > publicStringuserId;
>>>> > >
>>>> > > publicLatLonglocation;
>>>> > >
>>>> > > PublicStringcountryCode;
>>>> > >
>>>> > > publiclongtransactionCost;
>>>> > >
>>>> > > publicdoubletransactionDuration;
>>>> > >
>>>> > > publicList<String>traceMessages;
>>>> > >
>>>> > > };
>>>> > >
>>>> > >
>>>> > > @DefaultSchema(JavaFieldSchema.class)
>>>> > >
>>>> > > publicclassLatLong{
>>>> > >
>>>> > > publicdoublelatitude;
>>>> > >
>>>> > > publicdoublelongitude;
>>>> > >
>>>> > > }
>>>> > >
>>>> > >
>>>> > > Beam will automatically infer schemas for these classes! So if
>>>> > you have
>>>> > > a PCollection<UserEvent>, it will automatically get the
>>>> following
>>>> > schema:
>>>> > >
>>>> > >
>>>> > > UserEvent:
>>>> > >
>>>> > > userId: STRING
>>>> > >
>>>> > > location: ROW(LatLong)
>>>> > >
>>>> > > countryCode: STRING
>>>> > >
>>>> > > transactionCost: INT64
>>>> > >
>>>> > > transactionDuration: DOUBLE
>>>> > >
>>>> > > traceMessages: ARRAY[STRING]]
>>>> > >
>>>> > >
>>>> > > LatLong:
>>>> > >
>>>> > > latitude: DOUBLE
>>>> > >
>>>> > > longitude: DOUBLE
>>>> > >
>>>> > >
>>>> > > Now it’s not always possible to annotate the class like this
>>>> (you
>>>> > may
>>>> > > not own the class definition), so you can also explicitly
>>>> > register this
>>>> > > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
>>>> > JavaBeans.
>>>> > >
>>>> > >
>>>> > > Coders
>>>> > >
>>>> > > Beam has a built-in coder for any schema-aware PCollection,
>>>> largely
>>>> > > removing the need for users to care about coders. We generate
>>>> > low-level
>>>> > > bytecode (using ByteBuddy) to implement the coder for each
>>>> > schema, so
>>>> > > these coders are quite performant. This provides a better
>>>> default
>>>> > coder
>>>> > > for Java POJO objects as well. In the past users were
>>>> recommended
>>>> > to use
>>>> > > AvroCoder for pojos, which many have found inefficient. Now
>>>> > there’s a
>>>> > > more-efficient solution.
>>>> > >
>>>> > >
>>>> > > Utility Transforms
>>>> > >
>>>> > > Schemas are already useful for implementers of extensions such
>>>> as
>>>> > SQL,
>>>> > > but the goal was to use them to make Beam itself easier to use.
>>>> > To this
>>>> > > end, I’ve been implementing a library of transforms that allow
>>>> > for easy
>>>> > > manipulation of schema PCollections. So far Filter and Select
>>>> are
>>>> > > merged, Group is about to go out for review (it needs some more
>>>> > javadoc
>>>> > > and unit tests), and Join is being developed but doesn’t yet
>>>> have a
>>>> > > final interface.
>>>> > >
>>>> > >
>>>> > > Filter
>>>> > >
>>>> > > Given a PCollection<LatLong>, I want to keep only those in an
>>>> > area of
>>>> > > southern manhattan. Well this is easy!
>>>> > >
>>>> > >
>>>> > > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
>>>> > >
>>>> > > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
>>>> > >
>>>> > > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
>>>> > >
>>>> > >
>>>> > > Schemas along with lambdas allows us to write this transform
>>>> > > declaratively. The Filter transform also allows you to register
>>>> > filter
>>>> > > functions that operate on multiple fields at the same time.
>>>> > >
>>>> > >
>>>> > > Select
>>>> > >
>>>> > > Let’s say that I don’t need all the fields in a row. For
>>>> > instance, I’m
>>>> > > only interested in the userId and traceMessages, and don’t care
>>>> > about
>>>> > > the location. In that case I can write the following:
>>>> > >
>>>> > >
>>>> > > PCollection<Row>selected
>>>> > > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
>>>> > >
>>>> > >
>>>> > > BTW, Beam also keeps track of which fields are accessed by a
>>>> > transform
>>>> > > In the future we can automatically insert Selects in front of
>>>> > subgraphs
>>>> > > to drop fields that are not referenced in that subgraph.
>>>> > >
>>>> > >
>>>> > > Group
>>>> > >
>>>> > > Group is one of the more advanced transforms. In its most basic
>>>> > form, it
>>>> > > provides a convenient way to group by key:
>>>> > >
>>>> > >
>>>> > > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
>>>> > >
>>>> > >
>>>> allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
>>>> > >
>>>> > >
>>>> > > Notice how much more concise this is than using GroupByKey
>>>> directly!
>>>> > >
>>>> > >
>>>> > > The Group transform really starts to shine however when you
>>>> start
>>>> > > specifying aggregations. You can aggregate any field (or
>>>> fields) and
>>>> > > build up an output schema based on these aggregations. For
>>>> example:
>>>> > >
>>>> > >
>>>> > > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
>>>> > >
>>>> > > Group.byFieldNames(“userId”,“countryCode”)
>>>> > >
>>>> > > .aggregateField("cost",Sum.ofLongs(),"total_cost")
>>>> > >
>>>> > > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
>>>> > >
>>>> > >
>>>> >
>>>> .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
>>>> > >
>>>> > > “durationHistogram”)));
>>>> > >
>>>> > >
>>>> > > This will individually aggregate the specified fields of the
>>>> > input items
>>>> > > (by user and country), and generate an output schema for these
>>>> > > aggregations. In this case, the output schema will be the
>>>> following:
>>>> > >
>>>> > >
>>>> > > AggregatedSchema:
>>>> > >
>>>> > > total_cost: INT64
>>>> > >
>>>> > > top_purchases: ARRAY[INT64]
>>>> > >
>>>> > > durationHistogram: ARRAY[DOUBLE]
>>>> > >
>>>> > >
>>>> > > There are some more utility transforms I've written that are
>>>> worth
>>>> > > looking at such as Convert (which can convert between user
>>>> types
>>>> > that
>>>> > > share a schema) and Unnest (flattens nested schemas). There
>>>> are also
>>>> > > some others such as Pivot that we should consider writing
>>>> > >
>>>> > >
>>>> > > There is still a lot to do. All the todo items are reflected in
>>>> > JIRA,
>>>> > > however here are some examples of current gaps:
>>>> > >
>>>> > >
>>>> > > *
>>>> > >
>>>> > > Support for read-only POJOs (those with final fields) and
>>>> > JavaBean
>>>> > > (objects without setters).
>>>> > >
>>>> > > *
>>>> > >
>>>> > > Automatic schema inference from more Java types: protocol
>>>> > buffers,
>>>> > > avro, AutoValue, etc.
>>>> > >
>>>> > > *
>>>> > >
>>>> > > Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>>>> > >
>>>> > > *
>>>> > >
>>>> > > Support for JsonPath expressions so users can better
>>>> express
>>>> > nested
>>>> > > fields. E.g. support expressions of the form
>>>> > > Select.fields(“field1.field2”, “field3.*”,
>>>> “field4[0].field5”);
>>>> > >
>>>> > > *
>>>> > >
>>>> > > Schemas still need to be defined in our portability layer
>>>> so they
>>>> > > can be used cross language.
>>>> > >
>>>> > >
>>>> > > If anyone is interested in helping close these gaps, you'll be
>>>> > helping
>>>> > > make Beam a better, more-usable system!
>>>> > >
>>>> > > Reuven
>>>> > >
>>>> >
>>>>
>>>
>>>