You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Garry Turkington <g....@improvedigital.com> on 2014/02/13 14:27:00 UTC

Thinking about message formats

Hi,

I was thinking about how best to do testing on Samza jobs. The ability to replay streams appears to help a lot here as by pushing some data into the consumed streams then rewinding it is always possible to get the same data fed through the tasks. So that helps a lot in terms of dealing with known input data and such.

But then I started thinking about message format evolution over time which in honesty wasn't something I had considered before. My primary use cases for Samza are pulling apart lots of log files as they arrive so the obvious thing is to push each record/line as a single message. The problem of course is that as those log formats evolve over  time (almost always by having new columns added) that I need change both the ingest mechanism and the Samza tasks; firstly just not to be broken by the new format, secondly to actually use the additional columns if appropriate.

At which point Avro seems to have lots of value as a message format, we're moving to use it elsewhere in the data backend for very similar reasons of ability to manage schema evolution.

Anyone went down this path at all? I guess there are two  approaches, just have Samza treat the Avro message as a string and have each task parse and extract the fields of interest or to build an Avro serde that delivers an Avro record object in the envelope.

Thanks
Garry

...........................................................................................................................
Garry Turkington | CTO | +44-7871315944 | skypeGarryTurkington:
Improve Digital - Real time advertising technology
A company of PubliGroupe

cid:image001.png@01CECF2E.EF4FE940


Re: Thinking about message formats

Posted by Martin Kleppmann <mk...@linkedin.com>.
Hi Garry,

Yes, that issue often trips up people who are new to Avro. It's also Avro's main difference to Thrift and Protocol Buffers, which use a different approach to schema evolution. This blog post may be helpful for understanding the different approaches:
http://martin.kleppmann.com/2012/12/05/schema-evolution-in-avro-protocol-buffers-thrift.html

In your case, if you're not already committed to Avro for other reasons, then Protocol Buffers or Thrift may work slightly better for you. They include an identification tag on each field name, and when a parser encounters a field with a tag it doesn't know, that field is ignored. This approach means the Samza job doesn't have to know about the latest schema, and all messages simply get interpreted as the version of the schema that was compiled into the Samza job.

Cheers,
Martin

On 13 Feb 2014, at 17:31, Jakob Homan <jg...@gmail.com> wrote:
> Yeah, schema evolution for never-ending jobs has been a bit annoying so
> far.  So long as the Samza job has the latest version of the schema for
> producing and the messages arrive as generic records with their schema
> intact, one can transcode the newly arrived messages to the latest schema.
> This is what we're doing.  It's a manual step and I'd rather if Avro
> better supported this automatically, but it works for now.  I've been
> meaning to clean up the code and put it into Samza proper in a util package
> or something.
> 
> This approach does have a couple issues:
> (1) If there is a new version of the schema, one needs to restart the job
> with the latest version.  It could be possible to have another stream that
> published schema changes (getting a bit meta here) and the job could
> subscribe to that, updating as necessary.  Making sure everything hits in
> the right order is a bit tricky.  Alternatively, one could look for schema
> changes and assume any new one is the latest one, reconfiguring as
> necessary.  So far we've been using the restart-the-job approach.
> (2) Transcoding effectively bumps each message up to the newest version as
> they traverse the Samza pipelines.  This can be a good thing or not,
> depending on what you're looking for.  I don't think there's a one true
> correct approach to online schema evolution; it'll come down to a policy
> decision.
> -jg
> 
> 
> 
> On Thu, Feb 13, 2014 at 7:12 AM, Garry Turkington <
> g.turkington@improvedigital.com> wrote:
> 
>> Hi Martin,
>> 
>> Thanks for the input, and don't worry, it did make sense. :)
>> 
>> Though it did identify a hole in my understanding of Avro; I think I've
>> been spoiled using container files that included the schema. I thought that
>> a reader could process an Avro message with a previous version of the
>> schema, I didn't know it needed to know the exact schema with which the
>> message was written.
>> 
>> I normally have a new column added to my input files, often with a default
>> value and then afterwards add the specific processing that actually uses
>> the new data. So in the first case I don't care if the reader doesn't see
>> the new column, I just don't want it to break when newer version messages
>> come along. But sounds like it's not quite that straightforward and before
>> any consumer receives any messages written with a new schema that I'll need
>> make that new schema available to the consumer in some way.
>> 
>> Thanks,
>> Garry
>> 
>> -----Original Message-----
>> From: Martin Kleppmann [mailto:mkleppmann@linkedin.com]
>> Sent: 13 February 2014 15:11
>> To: <de...@samza.incubator.apache.org>
>> Subject: Re: Thinking about message formats
>> 
>> Hi Garry,
>> 
>> We use Avro a lot, and it works well with Samza. Schema evolution is very
>> good thing to have in your toolbox.
>> 
>> One thing to keep in mind with Avro: in order to parse a message, you need
>> to know the exact schema with which the data was written. You may have
>> multiple different producers writing messages with different schemas, so
>> the messages need to be tagged with the schema version they're using. The
>> consumer (Samza job) can then use schema evolution to map all messages to
>> the same schema (e.g. the latest version of the schema).
>> 
>> There currently isn't a standard way for tagging an individual
>> Avro-encoded message with the schema that was used to encode it. The
>> simplest you could do is to give every version of your schema a unique
>> version number, and to prefix every message in Kafka with the schema
>> version number. The version number could be a hash of the schema, or a
>> sequential number that you assign manually.
>> 
>> You can then compile the mapping from version numbers to Avro schemas into
>> your Samza job. For every message that comes in, you look at the version
>> number prefix, and parse the rest of the message with the appropriate
>> schema. (However, with this approach you need to deploy the Samza job to be
>> aware of any new schema version before any producer starts generating
>> messages in the new schema.) Alternatively, you can keep the mapping from
>> version numbers to schemas in a separate service, and look up version
>> numbers on demand. This has been discussed in
>> https://issues.apache.org/jira/browse/AVRO-1124 but afaik is not yet
>> publicly available.
>> 
>> Is this making any sense? I fear it sounds a bit confusing.
>> 
>> Martin
>> 
>> On 13 Feb 2014, at 13:27, Garry Turkington <
>> g.turkington@improvedigital.com> wrote:
>>> Hi,
>>> 
>>> I was thinking about how best to do testing on Samza jobs. The ability
>> to replay streams appears to help a lot here as by pushing some data into
>> the consumed streams then rewinding it is always possible to get the same
>> data fed through the tasks. So that helps a lot in terms of dealing with
>> known input data and such.
>>> 
>>> But then I started thinking about message format evolution over time
>> which in honesty wasn't something I had considered before. My primary use
>> cases for Samza are pulling apart lots of log files as they arrive so the
>> obvious thing is to push each record/line as a single message. The problem
>> of course is that as those log formats evolve over  time (almost always by
>> having new columns added) that I need change both the ingest mechanism and
>> the Samza tasks; firstly just not to be broken by the new format, secondly
>> to actually use the additional columns if appropriate.
>>> 
>>> At which point Avro seems to have lots of value as a message format,
>> we're moving to use it elsewhere in the data backend for very similar
>> reasons of ability to manage schema evolution.
>>> 
>>> Anyone went down this path at all? I guess there are two  approaches,
>> just have Samza treat the Avro message as a string and have each task parse
>> and extract the fields of interest or to build an Avro serde that delivers
>> an Avro record object in the envelope.
>>> 
>>> Thanks
>>> Garry
>>> 
>>> 
>> ...........................................................................................................................
>>> Garry Turkington | CTO | +44-7871315944 | skypeGarryTurkington:
>>> Improve Digital - Real time advertising technology A company of
>>> PubliGroupe
>>> 
>>> cid:image001.png@01CECF2E.EF4FE940
>>> 
>> 
>> 
>> -----
>> No virus found in this message.
>> Checked by AVG - www.avg.com
>> Version: 2014.0.4259 / Virus Database: 3697/7086 - Release Date: 02/12/14
>> 


Re: Thinking about message formats

Posted by Jakob Homan <jg...@gmail.com>.
Yeah, schema evolution for never-ending jobs has been a bit annoying so
far.  So long as the Samza job has the latest version of the schema for
producing and the messages arrive as generic records with their schema
intact, one can transcode the newly arrived messages to the latest schema.
 This is what we're doing.  It's a manual step and I'd rather if Avro
better supported this automatically, but it works for now.  I've been
meaning to clean up the code and put it into Samza proper in a util package
or something.

This approach does have a couple issues:
(1) If there is a new version of the schema, one needs to restart the job
with the latest version.  It could be possible to have another stream that
published schema changes (getting a bit meta here) and the job could
subscribe to that, updating as necessary.  Making sure everything hits in
the right order is a bit tricky.  Alternatively, one could look for schema
changes and assume any new one is the latest one, reconfiguring as
necessary.  So far we've been using the restart-the-job approach.
(2) Transcoding effectively bumps each message up to the newest version as
they traverse the Samza pipelines.  This can be a good thing or not,
depending on what you're looking for.  I don't think there's a one true
correct approach to online schema evolution; it'll come down to a policy
decision.
-jg



On Thu, Feb 13, 2014 at 7:12 AM, Garry Turkington <
g.turkington@improvedigital.com> wrote:

> Hi Martin,
>
> Thanks for the input, and don't worry, it did make sense. :)
>
> Though it did identify a hole in my understanding of Avro; I think I've
> been spoiled using container files that included the schema. I thought that
> a reader could process an Avro message with a previous version of the
> schema, I didn't know it needed to know the exact schema with which the
> message was written.
>
> I normally have a new column added to my input files, often with a default
> value and then afterwards add the specific processing that actually uses
> the new data. So in the first case I don't care if the reader doesn't see
> the new column, I just don't want it to break when newer version messages
> come along. But sounds like it's not quite that straightforward and before
> any consumer receives any messages written with a new schema that I'll need
> make that new schema available to the consumer in some way.
>
> Thanks,
> Garry
>
> -----Original Message-----
> From: Martin Kleppmann [mailto:mkleppmann@linkedin.com]
> Sent: 13 February 2014 15:11
> To: <de...@samza.incubator.apache.org>
> Subject: Re: Thinking about message formats
>
> Hi Garry,
>
> We use Avro a lot, and it works well with Samza. Schema evolution is very
> good thing to have in your toolbox.
>
> One thing to keep in mind with Avro: in order to parse a message, you need
> to know the exact schema with which the data was written. You may have
> multiple different producers writing messages with different schemas, so
> the messages need to be tagged with the schema version they're using. The
> consumer (Samza job) can then use schema evolution to map all messages to
> the same schema (e.g. the latest version of the schema).
>
> There currently isn't a standard way for tagging an individual
> Avro-encoded message with the schema that was used to encode it. The
> simplest you could do is to give every version of your schema a unique
> version number, and to prefix every message in Kafka with the schema
> version number. The version number could be a hash of the schema, or a
> sequential number that you assign manually.
>
> You can then compile the mapping from version numbers to Avro schemas into
> your Samza job. For every message that comes in, you look at the version
> number prefix, and parse the rest of the message with the appropriate
> schema. (However, with this approach you need to deploy the Samza job to be
> aware of any new schema version before any producer starts generating
> messages in the new schema.) Alternatively, you can keep the mapping from
> version numbers to schemas in a separate service, and look up version
> numbers on demand. This has been discussed in
> https://issues.apache.org/jira/browse/AVRO-1124 but afaik is not yet
> publicly available.
>
> Is this making any sense? I fear it sounds a bit confusing.
>
> Martin
>
> On 13 Feb 2014, at 13:27, Garry Turkington <
> g.turkington@improvedigital.com> wrote:
> > Hi,
> >
> > I was thinking about how best to do testing on Samza jobs. The ability
> to replay streams appears to help a lot here as by pushing some data into
> the consumed streams then rewinding it is always possible to get the same
> data fed through the tasks. So that helps a lot in terms of dealing with
> known input data and such.
> >
> > But then I started thinking about message format evolution over time
> which in honesty wasn't something I had considered before. My primary use
> cases for Samza are pulling apart lots of log files as they arrive so the
> obvious thing is to push each record/line as a single message. The problem
> of course is that as those log formats evolve over  time (almost always by
> having new columns added) that I need change both the ingest mechanism and
> the Samza tasks; firstly just not to be broken by the new format, secondly
> to actually use the additional columns if appropriate.
> >
> > At which point Avro seems to have lots of value as a message format,
> we're moving to use it elsewhere in the data backend for very similar
> reasons of ability to manage schema evolution.
> >
> > Anyone went down this path at all? I guess there are two  approaches,
> just have Samza treat the Avro message as a string and have each task parse
> and extract the fields of interest or to build an Avro serde that delivers
> an Avro record object in the envelope.
> >
> > Thanks
> > Garry
> >
> >
> ...........................................................................................................................
> > Garry Turkington | CTO | +44-7871315944 | skypeGarryTurkington:
> > Improve Digital - Real time advertising technology A company of
> > PubliGroupe
> >
> > cid:image001.png@01CECF2E.EF4FE940
> >
>
>
> -----
> No virus found in this message.
> Checked by AVG - www.avg.com
> Version: 2014.0.4259 / Virus Database: 3697/7086 - Release Date: 02/12/14
>

RE: Thinking about message formats

Posted by Garry Turkington <g....@improvedigital.com>.
Hi Martin,

Thanks for the input, and don't worry, it did make sense. :)

Though it did identify a hole in my understanding of Avro; I think I've been spoiled using container files that included the schema. I thought that a reader could process an Avro message with a previous version of the schema, I didn't know it needed to know the exact schema with which the message was written.

I normally have a new column added to my input files, often with a default value and then afterwards add the specific processing that actually uses the new data. So in the first case I don't care if the reader doesn't see the new column, I just don't want it to break when newer version messages come along. But sounds like it's not quite that straightforward and before any consumer receives any messages written with a new schema that I'll need make that new schema available to the consumer in some way.

Thanks,
Garry

-----Original Message-----
From: Martin Kleppmann [mailto:mkleppmann@linkedin.com] 
Sent: 13 February 2014 15:11
To: <de...@samza.incubator.apache.org>
Subject: Re: Thinking about message formats

Hi Garry,

We use Avro a lot, and it works well with Samza. Schema evolution is very good thing to have in your toolbox.

One thing to keep in mind with Avro: in order to parse a message, you need to know the exact schema with which the data was written. You may have multiple different producers writing messages with different schemas, so the messages need to be tagged with the schema version they're using. The consumer (Samza job) can then use schema evolution to map all messages to the same schema (e.g. the latest version of the schema).

There currently isn't a standard way for tagging an individual Avro-encoded message with the schema that was used to encode it. The simplest you could do is to give every version of your schema a unique version number, and to prefix every message in Kafka with the schema version number. The version number could be a hash of the schema, or a sequential number that you assign manually.

You can then compile the mapping from version numbers to Avro schemas into your Samza job. For every message that comes in, you look at the version number prefix, and parse the rest of the message with the appropriate schema. (However, with this approach you need to deploy the Samza job to be aware of any new schema version before any producer starts generating messages in the new schema.) Alternatively, you can keep the mapping from version numbers to schemas in a separate service, and look up version numbers on demand. This has been discussed in https://issues.apache.org/jira/browse/AVRO-1124 but afaik is not yet publicly available.

Is this making any sense? I fear it sounds a bit confusing.

Martin

On 13 Feb 2014, at 13:27, Garry Turkington <g....@improvedigital.com> wrote:
> Hi,
> 
> I was thinking about how best to do testing on Samza jobs. The ability to replay streams appears to help a lot here as by pushing some data into the consumed streams then rewinding it is always possible to get the same data fed through the tasks. So that helps a lot in terms of dealing with known input data and such.
> 
> But then I started thinking about message format evolution over time which in honesty wasn't something I had considered before. My primary use cases for Samza are pulling apart lots of log files as they arrive so the obvious thing is to push each record/line as a single message. The problem of course is that as those log formats evolve over  time (almost always by having new columns added) that I need change both the ingest mechanism and the Samza tasks; firstly just not to be broken by the new format, secondly to actually use the additional columns if appropriate.
> 
> At which point Avro seems to have lots of value as a message format, we're moving to use it elsewhere in the data backend for very similar reasons of ability to manage schema evolution.
> 
> Anyone went down this path at all? I guess there are two  approaches, just have Samza treat the Avro message as a string and have each task parse and extract the fields of interest or to build an Avro serde that delivers an Avro record object in the envelope.
> 
> Thanks
> Garry
> 
> ...........................................................................................................................
> Garry Turkington | CTO | +44-7871315944 | skypeGarryTurkington:
> Improve Digital - Real time advertising technology A company of 
> PubliGroupe
> 
> cid:image001.png@01CECF2E.EF4FE940
> 


-----
No virus found in this message.
Checked by AVG - www.avg.com
Version: 2014.0.4259 / Virus Database: 3697/7086 - Release Date: 02/12/14

Re: Thinking about message formats

Posted by Martin Kleppmann <mk...@linkedin.com>.
Hi Garry,

We use Avro a lot, and it works well with Samza. Schema evolution is very good thing to have in your toolbox.

One thing to keep in mind with Avro: in order to parse a message, you need to know the exact schema with which the data was written. You may have multiple different producers writing messages with different schemas, so the messages need to be tagged with the schema version they're using. The consumer (Samza job) can then use schema evolution to map all messages to the same schema (e.g. the latest version of the schema).

There currently isn't a standard way for tagging an individual Avro-encoded message with the schema that was used to encode it. The simplest you could do is to give every version of your schema a unique version number, and to prefix every message in Kafka with the schema version number. The version number could be a hash of the schema, or a sequential number that you assign manually.

You can then compile the mapping from version numbers to Avro schemas into your Samza job. For every message that comes in, you look at the version number prefix, and parse the rest of the message with the appropriate schema. (However, with this approach you need to deploy the Samza job to be aware of any new schema version before any producer starts generating messages in the new schema.) Alternatively, you can keep the mapping from version numbers to schemas in a separate service, and look up version numbers on demand. This has been discussed in https://issues.apache.org/jira/browse/AVRO-1124 but afaik is not yet publicly available.

Is this making any sense? I fear it sounds a bit confusing.

Martin

On 13 Feb 2014, at 13:27, Garry Turkington <g....@improvedigital.com> wrote:
> Hi,
> 
> I was thinking about how best to do testing on Samza jobs. The ability to replay streams appears to help a lot here as by pushing some data into the consumed streams then rewinding it is always possible to get the same data fed through the tasks. So that helps a lot in terms of dealing with known input data and such.
> 
> But then I started thinking about message format evolution over time which in honesty wasn't something I had considered before. My primary use cases for Samza are pulling apart lots of log files as they arrive so the obvious thing is to push each record/line as a single message. The problem of course is that as those log formats evolve over  time (almost always by having new columns added) that I need change both the ingest mechanism and the Samza tasks; firstly just not to be broken by the new format, secondly to actually use the additional columns if appropriate.
> 
> At which point Avro seems to have lots of value as a message format, we're moving to use it elsewhere in the data backend for very similar reasons of ability to manage schema evolution.
> 
> Anyone went down this path at all? I guess there are two  approaches, just have Samza treat the Avro message as a string and have each task parse and extract the fields of interest or to build an Avro serde that delivers an Avro record object in the envelope.
> 
> Thanks
> Garry
> 
> ...........................................................................................................................
> Garry Turkington | CTO | +44-7871315944 | skypeGarryTurkington:
> Improve Digital - Real time advertising technology
> A company of PubliGroupe
> 
> cid:image001.png@01CECF2E.EF4FE940
>