You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Adrian McCague <Ad...@zopa.com> on 2017/03/15 13:05:14 UTC

Streams 0.10.2.0 + RocksDB + Avro

Hi all,

We are getting collisions with subject names in our schema registry due to state stores that are holding Avro events:

"KSTREAM-JOINOTHER-0000000007-store-value",
  "KSTREAM-JOINOTHER-0000000006-store-value",
  "KSTREAM-JOINOTHER-0000000005-store-value",
  "KSTREAM-OUTEROTHER-0000000005-store-value",
  "KSTREAM-OUTEROTHER-0000000006-store-value",
  "KSTREAM-JOINTHIS-0000000005-store-value",
  "KSTREAM-JOINTHIS-0000000004-store-value",
  "KSTREAM-JOINTHIS-0000000006-store-value",

As you can see these are not prepended by their app id so any similarly constructed topology in another Streams application, that has different events will lead to conflicting schemas. Is this a bug, intended or user error?

I can see an advantage of the schema registry getting involved here but not if this happens, I see a few work arounds but unsure if any are recommended or there is something I have missed:

  1.  Use an ObjectSerde / Serializable (though then lose the Avro compatibility goodness)
  2.  Create a modified SpecificAvroSerde that includes the schema with each message
  3.  Anyway to influence the subject name?
  4.  Some form of local persistent schema registry for this purpose

I am not sure if anything changed with how Serdes were passed to RocksDB - I intend to deepdive into the source later at my next opportunity.
I did notice non avro messages in our topologies started throwing exceptions (not instances of SpecificRecord) since we moved to 0.10.2.0 so I suspect there was a plain Object serializer and now they use the topology defaults if you do not supply a serde.

Thanks
Adrian

RE: Streams 0.10.2.0 + RocksDB + Avro

Posted by Adrian McCague <Ad...@zopa.com>.
Hi Damian,

That is the SerDe we are using, agreed that looks like a good modification to make here for a state store version. 
I would add that it is a good idea to include the record type as well since an edit of the topology arrangement could still lead to issues down the line.

Thank you for your response, I will create a JIRA in due course
Adrian

-----Original Message-----
From: Damian Guy [mailto:damian.guy@gmail.com] 
Sent: 15 March 2017 15:00
To: users@kafka.apache.org
Subject: Re: Streams 0.10.2.0 + RocksDB + Avro

Hi Adrian,

The state store names are local names hence don't have the applicationId prefix, i.e., they are laid out on disk like so:
/state.dir/applicationId/task/state-store-name. Their corresponding change-logs are prefixed with the applicationId.

However, i can see in the case of the schema-registry that this would cause an issue as it will use the state-store name when serializing and deserializing the keys and values.

Which Avro Serde are you using?

There is an example serde here:
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/utils/SpecificAvroSerializer.java

You could potentially use something like this, but on line 57 prefix the topic name with the applicationId (if it doesn't already have it).

Would you mind raising a JIRA for this? It seems like an issue other people are yet to encounter.

Thanks,
Damian


On Wed, 15 Mar 2017 at 13:05 Adrian McCague <Ad...@zopa.com> wrote:

> Hi all,
>
> We are getting collisions with subject names in our schema registry 
> due to state stores that are holding Avro events:
>
> "KSTREAM-JOINOTHER-0000000007-store-value",
>   "KSTREAM-JOINOTHER-0000000006-store-value",
>   "KSTREAM-JOINOTHER-0000000005-store-value",
>   "KSTREAM-OUTEROTHER-0000000005-store-value",
>   "KSTREAM-OUTEROTHER-0000000006-store-value",
>   "KSTREAM-JOINTHIS-0000000005-store-value",
>   "KSTREAM-JOINTHIS-0000000004-store-value",
>   "KSTREAM-JOINTHIS-0000000006-store-value",
>
> As you can see these are not prepended by their app id so any 
> similarly constructed topology in another Streams application, that 
> has different events will lead to conflicting schemas. Is this a bug, 
> intended or user error?
>
> I can see an advantage of the schema registry getting involved here 
> but not if this happens, I see a few work arounds but unsure if any 
> are recommended or there is something I have missed:
>
>   1.  Use an ObjectSerde / Serializable (though then lose the Avro 
> compatibility goodness)
>   2.  Create a modified SpecificAvroSerde that includes the schema 
> with each message
>   3.  Anyway to influence the subject name?
>   4.  Some form of local persistent schema registry for this purpose
>
> I am not sure if anything changed with how Serdes were passed to 
> RocksDB - I intend to deepdive into the source later at my next opportunity.
> I did notice non avro messages in our topologies started throwing 
> exceptions (not instances of SpecificRecord) since we moved to 
> 0.10.2.0 so I suspect there was a plain Object serializer and now they 
> use the topology defaults if you do not supply a serde.
>
> Thanks
> Adrian
>

Re: Streams 0.10.2.0 + RocksDB + Avro

Posted by Damian Guy <da...@gmail.com>.
Hi Adrian,

The state store names are local names hence don't have the applicationId
prefix, i.e., they are laid out on disk like so:
/state.dir/applicationId/task/state-store-name. Their corresponding
change-logs are prefixed with the applicationId.

However, i can see in the case of the schema-registry that this would cause
an issue as it will use the state-store name when serializing and
deserializing the keys and values.

Which Avro Serde are you using?

There is an example serde here:
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/utils/SpecificAvroSerializer.java

You could potentially use something like this, but on line 57 prefix the
topic name with the applicationId (if it doesn't already have it).

Would you mind raising a JIRA for this? It seems like an issue other people
are yet to encounter.

Thanks,
Damian


On Wed, 15 Mar 2017 at 13:05 Adrian McCague <Ad...@zopa.com> wrote:

> Hi all,
>
> We are getting collisions with subject names in our schema registry due to
> state stores that are holding Avro events:
>
> "KSTREAM-JOINOTHER-0000000007-store-value",
>   "KSTREAM-JOINOTHER-0000000006-store-value",
>   "KSTREAM-JOINOTHER-0000000005-store-value",
>   "KSTREAM-OUTEROTHER-0000000005-store-value",
>   "KSTREAM-OUTEROTHER-0000000006-store-value",
>   "KSTREAM-JOINTHIS-0000000005-store-value",
>   "KSTREAM-JOINTHIS-0000000004-store-value",
>   "KSTREAM-JOINTHIS-0000000006-store-value",
>
> As you can see these are not prepended by their app id so any similarly
> constructed topology in another Streams application, that has different
> events will lead to conflicting schemas. Is this a bug, intended or user
> error?
>
> I can see an advantage of the schema registry getting involved here but
> not if this happens, I see a few work arounds but unsure if any are
> recommended or there is something I have missed:
>
>   1.  Use an ObjectSerde / Serializable (though then lose the Avro
> compatibility goodness)
>   2.  Create a modified SpecificAvroSerde that includes the schema with
> each message
>   3.  Anyway to influence the subject name?
>   4.  Some form of local persistent schema registry for this purpose
>
> I am not sure if anything changed with how Serdes were passed to RocksDB -
> I intend to deepdive into the source later at my next opportunity.
> I did notice non avro messages in our topologies started throwing
> exceptions (not instances of SpecificRecord) since we moved to 0.10.2.0 so
> I suspect there was a plain Object serializer and now they use the topology
> defaults if you do not supply a serde.
>
> Thanks
> Adrian
>