You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Todd Hughes <ju...@hotmail.com> on 2018/05/24 23:52:07 UTC

Ktable from compacted topic is using a changelog topic

From what I've read, a Ktable directly sourced from a compacted topic is smart enough to not use a change log in the background.  I must be doing something wrong though as I have a setup similar to below and I can see on the broker a topic named something like myappid-myStore-changelog is actually being utilized.

There's a sub-topology to create the compacted topic, and the simple sub-topology below for the ktable.

KTable<String, String> myTable = streamsBuilder.table(myCompactedTopicName,
                                      Consumed.with(stringSerde, stringSerde),
                                      Materialized.as("myStore"));


It's used to get at the state store for occasional queries.

kvStore = kafkaStreams.store("myStore", QueryableStoreTypes.keyValueStore());

Am I way off base in my understanding/approach, or just missing some tweak?



Re: Ktable from compacted topic is using a changelog topic

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Your understanding is correct.

Unfortunately, a regression slipped into 1.0 release such that the
described optimization is not done... It's fixed in upcoming 2.0 release.


-Matthias

On 5/24/18 4:52 PM, Todd Hughes wrote:
> From what I've read, a Ktable directly sourced from a compacted topic is smart enough to not use a change log in the background.  I must be doing something wrong though as I have a setup similar to below and I can see on the broker a topic named something like myappid-myStore-changelog is actually being utilized.
> 
> There's a sub-topology to create the compacted topic, and the simple sub-topology below for the ktable.
> 
> KTable<String, String> myTable = streamsBuilder.table(myCompactedTopicName,
                                      Consumed.with(stringSerde, stringSerde),
                                      Materialized.as("myStore"));

> 
> It's used to get at the state store for occasional queries.
> 
> kvStore = kafkaStreams.store("myStore", QueryableStoreTypes.keyValueStore());
> 
> Am I way off base in my understanding/approach, or just missing some tweak?
> 
>