You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Pablo Estrada <pa...@google.com> on 2019/07/23 23:40:38 UTC

Re: Choosing a coder for a class that contains a Row?

+dev <de...@beam.apache.org>
Thanks Ryan! This is quite helpful. Still not what I need : ) - but useful.

The data is change data capture from databases, and I'm putting it into a
Beam Row. The schema for the Row is generally homogeneous, but subject to
change at some point in the future if the schema in the database changes.
It's unusual and unlikely, but possible. I have no idea how Beam deals with
evolving schemas. +Reuven Lax <re...@google.com> is there documentation /
examples / anything around this? : )

I think evolving schemas is an interesting question....

For now, I am going to Java-serialize the objects, and delay figuring this
out. But I reckon I'll have to come back to this...

Best
-P.

On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba <ry...@skraba.com> wrote:

> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
> pipeline construction time, but can be discovered from the instance of
> MyData?
>
> Once discovered, is the schema "homogeneous" for all instance of
> MyData?  (i.e. someRow will always have the same schema for all
> instances afterwards, and there won't be another someRow with a
> different schema).
>
> We've encountered a parallel "problem" with pure Avro data, where the
> instance is a GenericRecord containing it's own Avro schema but
> *without* knowing the schema until the pipeline is run.  The solution
> that we've been using is a bit hacky, but we're using an ad hoc
> per-job schema registry and a custom coder where each worker saves the
> schema in the `encode` before writing the record, and loads it lazily
> in the `decode` before reading.
>
> The original code is available[1] (be gentle, it was written with Beam
> 0.4.0-incubating... and has continued to work until now).
>
> In practice, the ad hoc schema registry is just a server socket in the
> Spark driver, in-memory for DirectRunner / local mode, and a a
> read/write to a known location in other runners.  There are definitely
> other solutions with side-inputs and providers, and the job server in
> portability looks like an exciting candidate for per-job schema
> registry story...
>
> I'm super eager to see if there are other ideas or a contribution we
> can make in this area that's "Beam Row" oriented!
>
> Ryan
>
> [1]
> https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
>
> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada <pa...@google.com> wrote:
> >
> > Hello all,
> > I am writing a utility to push data to PubSub. My data class looks
> something like so:
> > ==========
> > class MyData {
> >   String someId;
> >   Row someRow;
> >   Row someOtherRow;
> > }
> > ==============
> > The schema for the Rows is not known a-priori. It is contained by the
> Row. I am then pushing this data to pubsub:
> > ===========
> > MyData pushingData = ....
> > WhatCoder? coder = ....
> >
> > ByteArrayOutputStream os = new ByteArrayOutputStream();
> > coder.encode(this, os);
> >
> > pubsubClient.connect();
> >
> pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
> > pubsubClient.close();
> > =================
> > What's the right coder to use in this case? I don't know if SchemaCoder
> will work, because it seems that it requires the Row's schema a priori. I
> have not been able to make AvroCoder work.
> >
> > Any tips?
> > Best
> > -P.
>

Re: Choosing a coder for a class that contains a Row?

Posted by Reuven Lax <re...@google.com>.
The metadata needed is already there - it's the encoding-position map in
Schema. However the code needs to be written to examine an old schema and a
new one in order to make the new schema encoding-compatible with the old
one. This shouldn't be difficult to write.

On Fri, Jul 26, 2019 at 10:21 AM Kenneth Knowles <ke...@apache.org> wrote:

> The most challenging part, as I understand it, surrounds automatically
> inferred schemas from POJOs, where Java's nondeterministic iteration order,
> combined with a row's inherent ordering, means that even an identical
> pipeline will need some metadata to plumb the right fields to the right
> column indices.
>
> Most relational migration management I've done incorporates explicit
> migration logic along with changes to the schema. This is quite a lot more
> robust, but more implementation work, than having a default policy
> proto/avro/thrift style. I think there's a lot to explore here.
>
> Kenn
>
> On Thu, Jul 25, 2019 at 9:59 AM Brian Hulette <bh...@google.com> wrote:
>
>> I know Reuven has put some thought into evolving schemas, but I'm not
>> sure it's documented anywhere as of now. The only documentation I've come
>> across as I bump around the schema code are some comments deep in RowCoder
>> [1].
>> Essentially the current serialization format for a row includes a row
>> count as a prefix so we can detect "simple" schema changes like column
>> additions and deletions. When decoding a Row, if the current schema
>> contains *more* fields than the encoded Row, the remaining fields are
>> populated with nulls in the resulting Row object. If the current schema
>> contains *fewer* fields than the encoded Row, the additional ones are
>> just dropped.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L296
>>
>> On Wed, Jul 24, 2019 at 6:00 AM Ryan Skraba <ry...@skraba.com> wrote:
>>
>>> I'm also really interested in the question of evolving schemas... It's
>>> something I've also put off figuring out :D
>>>
>>> With all its warts, the LazyAvroCoder technique (a coder backed by
>>> some sort of schema registry) _could_ work with "homogeneish" data
>>> (i.e. if the number of schemas in play for a single coder is much,
>>> much smaller than the number of elements), even if none of the the
>>> schemas are known at Pipeline construction.  The portability job
>>> server (which already stores and serves artifacts for running jobs)
>>> might be the right place to put a schema registry... but I'm not
>>> entirely convinced it's the right way to go either.
>>>
>>> At the same time, "simply" bumping a known schema to a new version is
>>> roughly equivalent to updating a pipeline in place.
>>>
>>> Sending the data as Java-serialized Rows will be equivalent to sending
>>> the entire schema with every record, so it _would_ work without
>>> involving a new, distributed state between one coders encode and
>>> anothers decode (at the cost of message size, of course).
>>>
>>> Ryan
>>>
>>>
>>> On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada <pa...@google.com>
>>> wrote:
>>> >
>>> > +dev
>>> > Thanks Ryan! This is quite helpful. Still not what I need : ) - but
>>> useful.
>>> >
>>> > The data is change data capture from databases, and I'm putting it
>>> into a Beam Row. The schema for the Row is generally homogeneous, but
>>> subject to change at some point in the future if the schema in the database
>>> changes. It's unusual and unlikely, but possible. I have no idea how Beam
>>> deals with evolving schemas. +Reuven Lax is there documentation / examples
>>> / anything around this? : )
>>> >
>>> > I think evolving schemas is an interesting question....
>>> >
>>> > For now, I am going to Java-serialize the objects, and delay figuring
>>> this out. But I reckon I'll have to come back to this...
>>> >
>>> > Best
>>> > -P.
>>> >
>>> > On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba <ry...@skraba.com> wrote:
>>> >>
>>> >> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
>>> >> pipeline construction time, but can be discovered from the instance of
>>> >> MyData?
>>> >>
>>> >> Once discovered, is the schema "homogeneous" for all instance of
>>> >> MyData?  (i.e. someRow will always have the same schema for all
>>> >> instances afterwards, and there won't be another someRow with a
>>> >> different schema).
>>> >>
>>> >> We've encountered a parallel "problem" with pure Avro data, where the
>>> >> instance is a GenericRecord containing it's own Avro schema but
>>> >> *without* knowing the schema until the pipeline is run.  The solution
>>> >> that we've been using is a bit hacky, but we're using an ad hoc
>>> >> per-job schema registry and a custom coder where each worker saves the
>>> >> schema in the `encode` before writing the record, and loads it lazily
>>> >> in the `decode` before reading.
>>> >>
>>> >> The original code is available[1] (be gentle, it was written with Beam
>>> >> 0.4.0-incubating... and has continued to work until now).
>>> >>
>>> >> In practice, the ad hoc schema registry is just a server socket in the
>>> >> Spark driver, in-memory for DirectRunner / local mode, and a a
>>> >> read/write to a known location in other runners.  There are definitely
>>> >> other solutions with side-inputs and providers, and the job server in
>>> >> portability looks like an exciting candidate for per-job schema
>>> >> registry story...
>>> >>
>>> >> I'm super eager to see if there are other ideas or a contribution we
>>> >> can make in this area that's "Beam Row" oriented!
>>> >>
>>> >> Ryan
>>> >>
>>> >> [1]
>>> https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
>>> >>
>>> >> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada <pa...@google.com>
>>> wrote:
>>> >> >
>>> >> > Hello all,
>>> >> > I am writing a utility to push data to PubSub. My data class looks
>>> something like so:
>>> >> > ==========
>>> >> > class MyData {
>>> >> >   String someId;
>>> >> >   Row someRow;
>>> >> >   Row someOtherRow;
>>> >> > }
>>> >> > ==============
>>> >> > The schema for the Rows is not known a-priori. It is contained by
>>> the Row. I am then pushing this data to pubsub:
>>> >> > ===========
>>> >> > MyData pushingData = ....
>>> >> > WhatCoder? coder = ....
>>> >> >
>>> >> > ByteArrayOutputStream os = new ByteArrayOutputStream();
>>> >> > coder.encode(this, os);
>>> >> >
>>> >> > pubsubClient.connect();
>>> >> >
>>> pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
>>> >> > pubsubClient.close();
>>> >> > =================
>>> >> > What's the right coder to use in this case? I don't know if
>>> SchemaCoder will work, because it seems that it requires the Row's schema a
>>> priori. I have not been able to make AvroCoder work.
>>> >> >
>>> >> > Any tips?
>>> >> > Best
>>> >> > -P.
>>>
>>

Re: Choosing a coder for a class that contains a Row?

Posted by Kenneth Knowles <ke...@apache.org>.
The most challenging part, as I understand it, surrounds automatically
inferred schemas from POJOs, where Java's nondeterministic iteration order,
combined with a row's inherent ordering, means that even an identical
pipeline will need some metadata to plumb the right fields to the right
column indices.

Most relational migration management I've done incorporates explicit
migration logic along with changes to the schema. This is quite a lot more
robust, but more implementation work, than having a default policy
proto/avro/thrift style. I think there's a lot to explore here.

Kenn

On Thu, Jul 25, 2019 at 9:59 AM Brian Hulette <bh...@google.com> wrote:

> I know Reuven has put some thought into evolving schemas, but I'm not sure
> it's documented anywhere as of now. The only documentation I've come across
> as I bump around the schema code are some comments deep in RowCoder [1].
> Essentially the current serialization format for a row includes a row
> count as a prefix so we can detect "simple" schema changes like column
> additions and deletions. When decoding a Row, if the current schema
> contains *more* fields than the encoded Row, the remaining fields are
> populated with nulls in the resulting Row object. If the current schema
> contains *fewer* fields than the encoded Row, the additional ones are
> just dropped.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L296
>
> On Wed, Jul 24, 2019 at 6:00 AM Ryan Skraba <ry...@skraba.com> wrote:
>
>> I'm also really interested in the question of evolving schemas... It's
>> something I've also put off figuring out :D
>>
>> With all its warts, the LazyAvroCoder technique (a coder backed by
>> some sort of schema registry) _could_ work with "homogeneish" data
>> (i.e. if the number of schemas in play for a single coder is much,
>> much smaller than the number of elements), even if none of the the
>> schemas are known at Pipeline construction.  The portability job
>> server (which already stores and serves artifacts for running jobs)
>> might be the right place to put a schema registry... but I'm not
>> entirely convinced it's the right way to go either.
>>
>> At the same time, "simply" bumping a known schema to a new version is
>> roughly equivalent to updating a pipeline in place.
>>
>> Sending the data as Java-serialized Rows will be equivalent to sending
>> the entire schema with every record, so it _would_ work without
>> involving a new, distributed state between one coders encode and
>> anothers decode (at the cost of message size, of course).
>>
>> Ryan
>>
>>
>> On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada <pa...@google.com> wrote:
>> >
>> > +dev
>> > Thanks Ryan! This is quite helpful. Still not what I need : ) - but
>> useful.
>> >
>> > The data is change data capture from databases, and I'm putting it into
>> a Beam Row. The schema for the Row is generally homogeneous, but subject to
>> change at some point in the future if the schema in the database changes.
>> It's unusual and unlikely, but possible. I have no idea how Beam deals with
>> evolving schemas. +Reuven Lax is there documentation / examples / anything
>> around this? : )
>> >
>> > I think evolving schemas is an interesting question....
>> >
>> > For now, I am going to Java-serialize the objects, and delay figuring
>> this out. But I reckon I'll have to come back to this...
>> >
>> > Best
>> > -P.
>> >
>> > On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba <ry...@skraba.com> wrote:
>> >>
>> >> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
>> >> pipeline construction time, but can be discovered from the instance of
>> >> MyData?
>> >>
>> >> Once discovered, is the schema "homogeneous" for all instance of
>> >> MyData?  (i.e. someRow will always have the same schema for all
>> >> instances afterwards, and there won't be another someRow with a
>> >> different schema).
>> >>
>> >> We've encountered a parallel "problem" with pure Avro data, where the
>> >> instance is a GenericRecord containing it's own Avro schema but
>> >> *without* knowing the schema until the pipeline is run.  The solution
>> >> that we've been using is a bit hacky, but we're using an ad hoc
>> >> per-job schema registry and a custom coder where each worker saves the
>> >> schema in the `encode` before writing the record, and loads it lazily
>> >> in the `decode` before reading.
>> >>
>> >> The original code is available[1] (be gentle, it was written with Beam
>> >> 0.4.0-incubating... and has continued to work until now).
>> >>
>> >> In practice, the ad hoc schema registry is just a server socket in the
>> >> Spark driver, in-memory for DirectRunner / local mode, and a a
>> >> read/write to a known location in other runners.  There are definitely
>> >> other solutions with side-inputs and providers, and the job server in
>> >> portability looks like an exciting candidate for per-job schema
>> >> registry story...
>> >>
>> >> I'm super eager to see if there are other ideas or a contribution we
>> >> can make in this area that's "Beam Row" oriented!
>> >>
>> >> Ryan
>> >>
>> >> [1]
>> https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
>> >>
>> >> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada <pa...@google.com>
>> wrote:
>> >> >
>> >> > Hello all,
>> >> > I am writing a utility to push data to PubSub. My data class looks
>> something like so:
>> >> > ==========
>> >> > class MyData {
>> >> >   String someId;
>> >> >   Row someRow;
>> >> >   Row someOtherRow;
>> >> > }
>> >> > ==============
>> >> > The schema for the Rows is not known a-priori. It is contained by
>> the Row. I am then pushing this data to pubsub:
>> >> > ===========
>> >> > MyData pushingData = ....
>> >> > WhatCoder? coder = ....
>> >> >
>> >> > ByteArrayOutputStream os = new ByteArrayOutputStream();
>> >> > coder.encode(this, os);
>> >> >
>> >> > pubsubClient.connect();
>> >> >
>> pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
>> >> > pubsubClient.close();
>> >> > =================
>> >> > What's the right coder to use in this case? I don't know if
>> SchemaCoder will work, because it seems that it requires the Row's schema a
>> priori. I have not been able to make AvroCoder work.
>> >> >
>> >> > Any tips?
>> >> > Best
>> >> > -P.
>>
>

Re: Choosing a coder for a class that contains a Row?

Posted by Kenneth Knowles <ke...@apache.org>.
The most challenging part, as I understand it, surrounds automatically
inferred schemas from POJOs, where Java's nondeterministic iteration order,
combined with a row's inherent ordering, means that even an identical
pipeline will need some metadata to plumb the right fields to the right
column indices.

Most relational migration management I've done incorporates explicit
migration logic along with changes to the schema. This is quite a lot more
robust, but more implementation work, than having a default policy
proto/avro/thrift style. I think there's a lot to explore here.

Kenn

On Thu, Jul 25, 2019 at 9:59 AM Brian Hulette <bh...@google.com> wrote:

> I know Reuven has put some thought into evolving schemas, but I'm not sure
> it's documented anywhere as of now. The only documentation I've come across
> as I bump around the schema code are some comments deep in RowCoder [1].
> Essentially the current serialization format for a row includes a row
> count as a prefix so we can detect "simple" schema changes like column
> additions and deletions. When decoding a Row, if the current schema
> contains *more* fields than the encoded Row, the remaining fields are
> populated with nulls in the resulting Row object. If the current schema
> contains *fewer* fields than the encoded Row, the additional ones are
> just dropped.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L296
>
> On Wed, Jul 24, 2019 at 6:00 AM Ryan Skraba <ry...@skraba.com> wrote:
>
>> I'm also really interested in the question of evolving schemas... It's
>> something I've also put off figuring out :D
>>
>> With all its warts, the LazyAvroCoder technique (a coder backed by
>> some sort of schema registry) _could_ work with "homogeneish" data
>> (i.e. if the number of schemas in play for a single coder is much,
>> much smaller than the number of elements), even if none of the the
>> schemas are known at Pipeline construction.  The portability job
>> server (which already stores and serves artifacts for running jobs)
>> might be the right place to put a schema registry... but I'm not
>> entirely convinced it's the right way to go either.
>>
>> At the same time, "simply" bumping a known schema to a new version is
>> roughly equivalent to updating a pipeline in place.
>>
>> Sending the data as Java-serialized Rows will be equivalent to sending
>> the entire schema with every record, so it _would_ work without
>> involving a new, distributed state between one coders encode and
>> anothers decode (at the cost of message size, of course).
>>
>> Ryan
>>
>>
>> On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada <pa...@google.com> wrote:
>> >
>> > +dev
>> > Thanks Ryan! This is quite helpful. Still not what I need : ) - but
>> useful.
>> >
>> > The data is change data capture from databases, and I'm putting it into
>> a Beam Row. The schema for the Row is generally homogeneous, but subject to
>> change at some point in the future if the schema in the database changes.
>> It's unusual and unlikely, but possible. I have no idea how Beam deals with
>> evolving schemas. +Reuven Lax is there documentation / examples / anything
>> around this? : )
>> >
>> > I think evolving schemas is an interesting question....
>> >
>> > For now, I am going to Java-serialize the objects, and delay figuring
>> this out. But I reckon I'll have to come back to this...
>> >
>> > Best
>> > -P.
>> >
>> > On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba <ry...@skraba.com> wrote:
>> >>
>> >> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
>> >> pipeline construction time, but can be discovered from the instance of
>> >> MyData?
>> >>
>> >> Once discovered, is the schema "homogeneous" for all instance of
>> >> MyData?  (i.e. someRow will always have the same schema for all
>> >> instances afterwards, and there won't be another someRow with a
>> >> different schema).
>> >>
>> >> We've encountered a parallel "problem" with pure Avro data, where the
>> >> instance is a GenericRecord containing it's own Avro schema but
>> >> *without* knowing the schema until the pipeline is run.  The solution
>> >> that we've been using is a bit hacky, but we're using an ad hoc
>> >> per-job schema registry and a custom coder where each worker saves the
>> >> schema in the `encode` before writing the record, and loads it lazily
>> >> in the `decode` before reading.
>> >>
>> >> The original code is available[1] (be gentle, it was written with Beam
>> >> 0.4.0-incubating... and has continued to work until now).
>> >>
>> >> In practice, the ad hoc schema registry is just a server socket in the
>> >> Spark driver, in-memory for DirectRunner / local mode, and a a
>> >> read/write to a known location in other runners.  There are definitely
>> >> other solutions with side-inputs and providers, and the job server in
>> >> portability looks like an exciting candidate for per-job schema
>> >> registry story...
>> >>
>> >> I'm super eager to see if there are other ideas or a contribution we
>> >> can make in this area that's "Beam Row" oriented!
>> >>
>> >> Ryan
>> >>
>> >> [1]
>> https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
>> >>
>> >> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada <pa...@google.com>
>> wrote:
>> >> >
>> >> > Hello all,
>> >> > I am writing a utility to push data to PubSub. My data class looks
>> something like so:
>> >> > ==========
>> >> > class MyData {
>> >> >   String someId;
>> >> >   Row someRow;
>> >> >   Row someOtherRow;
>> >> > }
>> >> > ==============
>> >> > The schema for the Rows is not known a-priori. It is contained by
>> the Row. I am then pushing this data to pubsub:
>> >> > ===========
>> >> > MyData pushingData = ....
>> >> > WhatCoder? coder = ....
>> >> >
>> >> > ByteArrayOutputStream os = new ByteArrayOutputStream();
>> >> > coder.encode(this, os);
>> >> >
>> >> > pubsubClient.connect();
>> >> >
>> pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
>> >> > pubsubClient.close();
>> >> > =================
>> >> > What's the right coder to use in this case? I don't know if
>> SchemaCoder will work, because it seems that it requires the Row's schema a
>> priori. I have not been able to make AvroCoder work.
>> >> >
>> >> > Any tips?
>> >> > Best
>> >> > -P.
>>
>

Re: Choosing a coder for a class that contains a Row?

Posted by Brian Hulette <bh...@google.com>.
I know Reuven has put some thought into evolving schemas, but I'm not sure
it's documented anywhere as of now. The only documentation I've come across
as I bump around the schema code are some comments deep in RowCoder [1].
Essentially the current serialization format for a row includes a row count
as a prefix so we can detect "simple" schema changes like column additions
and deletions. When decoding a Row, if the current schema contains
*more* fields
than the encoded Row, the remaining fields are populated with nulls in the
resulting Row object. If the current schema contains *fewer* fields than
the encoded Row, the additional ones are just dropped.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L296

On Wed, Jul 24, 2019 at 6:00 AM Ryan Skraba <ry...@skraba.com> wrote:

> I'm also really interested in the question of evolving schemas... It's
> something I've also put off figuring out :D
>
> With all its warts, the LazyAvroCoder technique (a coder backed by
> some sort of schema registry) _could_ work with "homogeneish" data
> (i.e. if the number of schemas in play for a single coder is much,
> much smaller than the number of elements), even if none of the the
> schemas are known at Pipeline construction.  The portability job
> server (which already stores and serves artifacts for running jobs)
> might be the right place to put a schema registry... but I'm not
> entirely convinced it's the right way to go either.
>
> At the same time, "simply" bumping a known schema to a new version is
> roughly equivalent to updating a pipeline in place.
>
> Sending the data as Java-serialized Rows will be equivalent to sending
> the entire schema with every record, so it _would_ work without
> involving a new, distributed state between one coders encode and
> anothers decode (at the cost of message size, of course).
>
> Ryan
>
>
> On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada <pa...@google.com> wrote:
> >
> > +dev
> > Thanks Ryan! This is quite helpful. Still not what I need : ) - but
> useful.
> >
> > The data is change data capture from databases, and I'm putting it into
> a Beam Row. The schema for the Row is generally homogeneous, but subject to
> change at some point in the future if the schema in the database changes.
> It's unusual and unlikely, but possible. I have no idea how Beam deals with
> evolving schemas. +Reuven Lax is there documentation / examples / anything
> around this? : )
> >
> > I think evolving schemas is an interesting question....
> >
> > For now, I am going to Java-serialize the objects, and delay figuring
> this out. But I reckon I'll have to come back to this...
> >
> > Best
> > -P.
> >
> > On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba <ry...@skraba.com> wrote:
> >>
> >> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
> >> pipeline construction time, but can be discovered from the instance of
> >> MyData?
> >>
> >> Once discovered, is the schema "homogeneous" for all instance of
> >> MyData?  (i.e. someRow will always have the same schema for all
> >> instances afterwards, and there won't be another someRow with a
> >> different schema).
> >>
> >> We've encountered a parallel "problem" with pure Avro data, where the
> >> instance is a GenericRecord containing it's own Avro schema but
> >> *without* knowing the schema until the pipeline is run.  The solution
> >> that we've been using is a bit hacky, but we're using an ad hoc
> >> per-job schema registry and a custom coder where each worker saves the
> >> schema in the `encode` before writing the record, and loads it lazily
> >> in the `decode` before reading.
> >>
> >> The original code is available[1] (be gentle, it was written with Beam
> >> 0.4.0-incubating... and has continued to work until now).
> >>
> >> In practice, the ad hoc schema registry is just a server socket in the
> >> Spark driver, in-memory for DirectRunner / local mode, and a a
> >> read/write to a known location in other runners.  There are definitely
> >> other solutions with side-inputs and providers, and the job server in
> >> portability looks like an exciting candidate for per-job schema
> >> registry story...
> >>
> >> I'm super eager to see if there are other ideas or a contribution we
> >> can make in this area that's "Beam Row" oriented!
> >>
> >> Ryan
> >>
> >> [1]
> https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
> >>
> >> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada <pa...@google.com>
> wrote:
> >> >
> >> > Hello all,
> >> > I am writing a utility to push data to PubSub. My data class looks
> something like so:
> >> > ==========
> >> > class MyData {
> >> >   String someId;
> >> >   Row someRow;
> >> >   Row someOtherRow;
> >> > }
> >> > ==============
> >> > The schema for the Rows is not known a-priori. It is contained by the
> Row. I am then pushing this data to pubsub:
> >> > ===========
> >> > MyData pushingData = ....
> >> > WhatCoder? coder = ....
> >> >
> >> > ByteArrayOutputStream os = new ByteArrayOutputStream();
> >> > coder.encode(this, os);
> >> >
> >> > pubsubClient.connect();
> >> >
> pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
> >> > pubsubClient.close();
> >> > =================
> >> > What's the right coder to use in this case? I don't know if
> SchemaCoder will work, because it seems that it requires the Row's schema a
> priori. I have not been able to make AvroCoder work.
> >> >
> >> > Any tips?
> >> > Best
> >> > -P.
>

Re: Choosing a coder for a class that contains a Row?

Posted by Ryan Skraba <ry...@skraba.com>.
I'm also really interested in the question of evolving schemas... It's
something I've also put off figuring out :D

With all its warts, the LazyAvroCoder technique (a coder backed by
some sort of schema registry) _could_ work with "homogeneish" data
(i.e. if the number of schemas in play for a single coder is much,
much smaller than the number of elements), even if none of the the
schemas are known at Pipeline construction.  The portability job
server (which already stores and serves artifacts for running jobs)
might be the right place to put a schema registry... but I'm not
entirely convinced it's the right way to go either.

At the same time, "simply" bumping a known schema to a new version is
roughly equivalent to updating a pipeline in place.

Sending the data as Java-serialized Rows will be equivalent to sending
the entire schema with every record, so it _would_ work without
involving a new, distributed state between one coders encode and
anothers decode (at the cost of message size, of course).

Ryan


On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada <pa...@google.com> wrote:
>
> +dev
> Thanks Ryan! This is quite helpful. Still not what I need : ) - but useful.
>
> The data is change data capture from databases, and I'm putting it into a Beam Row. The schema for the Row is generally homogeneous, but subject to change at some point in the future if the schema in the database changes. It's unusual and unlikely, but possible. I have no idea how Beam deals with evolving schemas. +Reuven Lax is there documentation / examples / anything around this? : )
>
> I think evolving schemas is an interesting question....
>
> For now, I am going to Java-serialize the objects, and delay figuring this out. But I reckon I'll have to come back to this...
>
> Best
> -P.
>
> On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba <ry...@skraba.com> wrote:
>>
>> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
>> pipeline construction time, but can be discovered from the instance of
>> MyData?
>>
>> Once discovered, is the schema "homogeneous" for all instance of
>> MyData?  (i.e. someRow will always have the same schema for all
>> instances afterwards, and there won't be another someRow with a
>> different schema).
>>
>> We've encountered a parallel "problem" with pure Avro data, where the
>> instance is a GenericRecord containing it's own Avro schema but
>> *without* knowing the schema until the pipeline is run.  The solution
>> that we've been using is a bit hacky, but we're using an ad hoc
>> per-job schema registry and a custom coder where each worker saves the
>> schema in the `encode` before writing the record, and loads it lazily
>> in the `decode` before reading.
>>
>> The original code is available[1] (be gentle, it was written with Beam
>> 0.4.0-incubating... and has continued to work until now).
>>
>> In practice, the ad hoc schema registry is just a server socket in the
>> Spark driver, in-memory for DirectRunner / local mode, and a a
>> read/write to a known location in other runners.  There are definitely
>> other solutions with side-inputs and providers, and the job server in
>> portability looks like an exciting candidate for per-job schema
>> registry story...
>>
>> I'm super eager to see if there are other ideas or a contribution we
>> can make in this area that's "Beam Row" oriented!
>>
>> Ryan
>>
>> [1] https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
>>
>> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada <pa...@google.com> wrote:
>> >
>> > Hello all,
>> > I am writing a utility to push data to PubSub. My data class looks something like so:
>> > ==========
>> > class MyData {
>> >   String someId;
>> >   Row someRow;
>> >   Row someOtherRow;
>> > }
>> > ==============
>> > The schema for the Rows is not known a-priori. It is contained by the Row. I am then pushing this data to pubsub:
>> > ===========
>> > MyData pushingData = ....
>> > WhatCoder? coder = ....
>> >
>> > ByteArrayOutputStream os = new ByteArrayOutputStream();
>> > coder.encode(this, os);
>> >
>> > pubsubClient.connect();
>> > pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
>> > pubsubClient.close();
>> > =================
>> > What's the right coder to use in this case? I don't know if SchemaCoder will work, because it seems that it requires the Row's schema a priori. I have not been able to make AvroCoder work.
>> >
>> > Any tips?
>> > Best
>> > -P.

Re: Choosing a coder for a class that contains a Row?

Posted by Ryan Skraba <ry...@skraba.com>.
I'm also really interested in the question of evolving schemas... It's
something I've also put off figuring out :D

With all its warts, the LazyAvroCoder technique (a coder backed by
some sort of schema registry) _could_ work with "homogeneish" data
(i.e. if the number of schemas in play for a single coder is much,
much smaller than the number of elements), even if none of the the
schemas are known at Pipeline construction.  The portability job
server (which already stores and serves artifacts for running jobs)
might be the right place to put a schema registry... but I'm not
entirely convinced it's the right way to go either.

At the same time, "simply" bumping a known schema to a new version is
roughly equivalent to updating a pipeline in place.

Sending the data as Java-serialized Rows will be equivalent to sending
the entire schema with every record, so it _would_ work without
involving a new, distributed state between one coders encode and
anothers decode (at the cost of message size, of course).

Ryan


On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada <pa...@google.com> wrote:
>
> +dev
> Thanks Ryan! This is quite helpful. Still not what I need : ) - but useful.
>
> The data is change data capture from databases, and I'm putting it into a Beam Row. The schema for the Row is generally homogeneous, but subject to change at some point in the future if the schema in the database changes. It's unusual and unlikely, but possible. I have no idea how Beam deals with evolving schemas. +Reuven Lax is there documentation / examples / anything around this? : )
>
> I think evolving schemas is an interesting question....
>
> For now, I am going to Java-serialize the objects, and delay figuring this out. But I reckon I'll have to come back to this...
>
> Best
> -P.
>
> On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba <ry...@skraba.com> wrote:
>>
>> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
>> pipeline construction time, but can be discovered from the instance of
>> MyData?
>>
>> Once discovered, is the schema "homogeneous" for all instance of
>> MyData?  (i.e. someRow will always have the same schema for all
>> instances afterwards, and there won't be another someRow with a
>> different schema).
>>
>> We've encountered a parallel "problem" with pure Avro data, where the
>> instance is a GenericRecord containing it's own Avro schema but
>> *without* knowing the schema until the pipeline is run.  The solution
>> that we've been using is a bit hacky, but we're using an ad hoc
>> per-job schema registry and a custom coder where each worker saves the
>> schema in the `encode` before writing the record, and loads it lazily
>> in the `decode` before reading.
>>
>> The original code is available[1] (be gentle, it was written with Beam
>> 0.4.0-incubating... and has continued to work until now).
>>
>> In practice, the ad hoc schema registry is just a server socket in the
>> Spark driver, in-memory for DirectRunner / local mode, and a a
>> read/write to a known location in other runners.  There are definitely
>> other solutions with side-inputs and providers, and the job server in
>> portability looks like an exciting candidate for per-job schema
>> registry story...
>>
>> I'm super eager to see if there are other ideas or a contribution we
>> can make in this area that's "Beam Row" oriented!
>>
>> Ryan
>>
>> [1] https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
>>
>> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada <pa...@google.com> wrote:
>> >
>> > Hello all,
>> > I am writing a utility to push data to PubSub. My data class looks something like so:
>> > ==========
>> > class MyData {
>> >   String someId;
>> >   Row someRow;
>> >   Row someOtherRow;
>> > }
>> > ==============
>> > The schema for the Rows is not known a-priori. It is contained by the Row. I am then pushing this data to pubsub:
>> > ===========
>> > MyData pushingData = ....
>> > WhatCoder? coder = ....
>> >
>> > ByteArrayOutputStream os = new ByteArrayOutputStream();
>> > coder.encode(this, os);
>> >
>> > pubsubClient.connect();
>> > pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
>> > pubsubClient.close();
>> > =================
>> > What's the right coder to use in this case? I don't know if SchemaCoder will work, because it seems that it requires the Row's schema a priori. I have not been able to make AvroCoder work.
>> >
>> > Any tips?
>> > Best
>> > -P.