You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Kristopher Kane <kk...@gmail.com> on 2018/01/05 16:56:59 UTC

Re: Kafka Streams Avro SerDe version/id caching

Just a follow up caching example for the DSL with emphasis on the consumer
(deserializer):

final KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(
        new CachedSchemaRegistryClient(config.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
1024));

final KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(
        new CachedSchemaRegistryClient(config.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
1024));

final Serde genericAvroSerde = Serdes.serdeFrom(kafkaAvroSerializer,
kafkaAvroDeserializer);

final KStream<String, GenericRecord> stream =
builder.stream(Serdes.String(), genericAvroSerde, incomingTopic);


On Fri, Oct 20, 2017 at 12:50 AM, Kristopher Kane <kk...@gmail.com>
wrote:

> I fixated on using the key/value deserializer classes in the consumer
> properties.  Overloading the consumer constructor is the way to enable
> schema caching:
>
> CachedSchemaRegistryClient cachedSchemaRegistryClient = new
> CachedSchemaRegistryClient("registry_url", 1000);
> KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(
> cachedSchemaRegistryClient);
> StringDeserializer stringDeserializer = new StringDeserializer();
>
> final KafkaConsumer consumer = new KafkaConsumer(consumerProps,
> stringDeserializer , kafkaAvroDeserializer);
>
> In Streams, there is a similar overload for addSource:
>
> TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics)
>
> Kris
>
>
> On Tue, Oct 3, 2017 at 4:34 PM, Svante Karlsson <sv...@csi.se>
> wrote:
>
>> I've implemented the same logic for a c++ client - caching is the only way
>> to go since the performance impact of not doing it would be to big. So bet
>> on caching on all clients.
>>
>> 2017-10-03 18:12 GMT+02:00 Damian Guy <da...@gmail.com>:
>>
>> > If you are using the confluent schema registry then the will be cached
>> by
>> > the SchemaRegistryClient.
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Tue, 3 Oct 2017 at 09:00 Ted Yu <yu...@gmail.com> wrote:
>> >
>> > > I did a quick search in the code base - there doesn't seem to be
>> caching
>> > as
>> > > you described.
>> > >
>> > > On Tue, Oct 3, 2017 at 6:36 AM, Kristopher Kane <kkane.list@gmail.com
>> >
>> > > wrote:
>> > >
>> > > > If using a Byte SerDe and schema registry in the consumer configs
>> of a
>> > > > Kafka streams application, does it cache the Avro schemas by ID and
>> > > version
>> > > > after fetching from the registry once?
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Kris
>> > > >
>> > >
>> >
>>
>
>