You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Kristopher Kane <kk...@gmail.com> on 2017/10/03 13:36:53 UTC

Kafka Streams Avro SerDe version/id caching

If using a Byte SerDe and schema registry in the consumer configs of a
Kafka streams application, does it cache the Avro schemas by ID and version
after fetching from the registry once?

Thanks,

Kris

Re: Kafka Streams Avro SerDe version/id caching

Posted by Kristopher Kane <kk...@gmail.com>.
Just a follow up caching example for the DSL with emphasis on the consumer
(deserializer):

final KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(
        new CachedSchemaRegistryClient(config.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
1024));

final KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(
        new CachedSchemaRegistryClient(config.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
1024));

final Serde genericAvroSerde = Serdes.serdeFrom(kafkaAvroSerializer,
kafkaAvroDeserializer);

final KStream<String, GenericRecord> stream =
builder.stream(Serdes.String(), genericAvroSerde, incomingTopic);


On Fri, Oct 20, 2017 at 12:50 AM, Kristopher Kane <kk...@gmail.com>
wrote:

> I fixated on using the key/value deserializer classes in the consumer
> properties.  Overloading the consumer constructor is the way to enable
> schema caching:
>
> CachedSchemaRegistryClient cachedSchemaRegistryClient = new
> CachedSchemaRegistryClient("registry_url", 1000);
> KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(
> cachedSchemaRegistryClient);
> StringDeserializer stringDeserializer = new StringDeserializer();
>
> final KafkaConsumer consumer = new KafkaConsumer(consumerProps,
> stringDeserializer , kafkaAvroDeserializer);
>
> In Streams, there is a similar overload for addSource:
>
> TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics)
>
> Kris
>
>
> On Tue, Oct 3, 2017 at 4:34 PM, Svante Karlsson <sv...@csi.se>
> wrote:
>
>> I've implemented the same logic for a c++ client - caching is the only way
>> to go since the performance impact of not doing it would be to big. So bet
>> on caching on all clients.
>>
>> 2017-10-03 18:12 GMT+02:00 Damian Guy <da...@gmail.com>:
>>
>> > If you are using the confluent schema registry then the will be cached
>> by
>> > the SchemaRegistryClient.
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Tue, 3 Oct 2017 at 09:00 Ted Yu <yu...@gmail.com> wrote:
>> >
>> > > I did a quick search in the code base - there doesn't seem to be
>> caching
>> > as
>> > > you described.
>> > >
>> > > On Tue, Oct 3, 2017 at 6:36 AM, Kristopher Kane <kkane.list@gmail.com
>> >
>> > > wrote:
>> > >
>> > > > If using a Byte SerDe and schema registry in the consumer configs
>> of a
>> > > > Kafka streams application, does it cache the Avro schemas by ID and
>> > > version
>> > > > after fetching from the registry once?
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Kris
>> > > >
>> > >
>> >
>>
>
>

Re: Kafka Streams Avro SerDe version/id caching

Posted by Kristopher Kane <kk...@gmail.com>.
I fixated on using the key/value deserializer classes in the consumer
properties.  Overloading the consumer constructor is the way to enable
schema caching:

CachedSchemaRegistryClient cachedSchemaRegistryClient = new
CachedSchemaRegistryClient("registry_url", 1000);
KafkaAvroDeserializer kafkaAvroDeserializer = new
KafkaAvroDeserializer(cachedSchemaRegistryClient);
StringDeserializer stringDeserializer = new StringDeserializer();

final KafkaConsumer consumer = new KafkaConsumer(consumerProps,
stringDeserializer , kafkaAvroDeserializer);

In Streams, there is a similar overload for addSource:

TopologyBuilder addSource(String name, Deserializer keyDeserializer,
Deserializer valDeserializer, String... topics)

Kris


On Tue, Oct 3, 2017 at 4:34 PM, Svante Karlsson <sv...@csi.se>
wrote:

> I've implemented the same logic for a c++ client - caching is the only way
> to go since the performance impact of not doing it would be to big. So bet
> on caching on all clients.
>
> 2017-10-03 18:12 GMT+02:00 Damian Guy <da...@gmail.com>:
>
> > If you are using the confluent schema registry then the will be cached by
> > the SchemaRegistryClient.
> >
> > Thanks,
> > Damian
> >
> > On Tue, 3 Oct 2017 at 09:00 Ted Yu <yu...@gmail.com> wrote:
> >
> > > I did a quick search in the code base - there doesn't seem to be
> caching
> > as
> > > you described.
> > >
> > > On Tue, Oct 3, 2017 at 6:36 AM, Kristopher Kane <kk...@gmail.com>
> > > wrote:
> > >
> > > > If using a Byte SerDe and schema registry in the consumer configs of
> a
> > > > Kafka streams application, does it cache the Avro schemas by ID and
> > > version
> > > > after fetching from the registry once?
> > > >
> > > > Thanks,
> > > >
> > > > Kris
> > > >
> > >
> >
>

Re: Kafka Streams Avro SerDe version/id caching

Posted by Svante Karlsson <sv...@csi.se>.
I've implemented the same logic for a c++ client - caching is the only way
to go since the performance impact of not doing it would be to big. So bet
on caching on all clients.

2017-10-03 18:12 GMT+02:00 Damian Guy <da...@gmail.com>:

> If you are using the confluent schema registry then the will be cached by
> the SchemaRegistryClient.
>
> Thanks,
> Damian
>
> On Tue, 3 Oct 2017 at 09:00 Ted Yu <yu...@gmail.com> wrote:
>
> > I did a quick search in the code base - there doesn't seem to be caching
> as
> > you described.
> >
> > On Tue, Oct 3, 2017 at 6:36 AM, Kristopher Kane <kk...@gmail.com>
> > wrote:
> >
> > > If using a Byte SerDe and schema registry in the consumer configs of a
> > > Kafka streams application, does it cache the Avro schemas by ID and
> > version
> > > after fetching from the registry once?
> > >
> > > Thanks,
> > >
> > > Kris
> > >
> >
>

Re: Kafka Streams Avro SerDe version/id caching

Posted by Damian Guy <da...@gmail.com>.
If you are using the confluent schema registry then the will be cached by
the SchemaRegistryClient.

Thanks,
Damian

On Tue, 3 Oct 2017 at 09:00 Ted Yu <yu...@gmail.com> wrote:

> I did a quick search in the code base - there doesn't seem to be caching as
> you described.
>
> On Tue, Oct 3, 2017 at 6:36 AM, Kristopher Kane <kk...@gmail.com>
> wrote:
>
> > If using a Byte SerDe and schema registry in the consumer configs of a
> > Kafka streams application, does it cache the Avro schemas by ID and
> version
> > after fetching from the registry once?
> >
> > Thanks,
> >
> > Kris
> >
>

Re: Kafka Streams Avro SerDe version/id caching

Posted by Ted Yu <yu...@gmail.com>.
I did a quick search in the code base - there doesn't seem to be caching as
you described.

On Tue, Oct 3, 2017 at 6:36 AM, Kristopher Kane <kk...@gmail.com>
wrote:

> If using a Byte SerDe and schema registry in the consumer configs of a
> Kafka streams application, does it cache the Avro schemas by ID and version
> after fetching from the registry once?
>
> Thanks,
>
> Kris
>