You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Xiyuan Hu <xi...@gmail.com> on 2019/09/19 22:48:41 UTC

Kafka Streams Rocksdb retention didn't remove old data with windowed function

Hi,

I'm running a Kafka streams app(v2.1.0) with windowed function. But
after 24 hours running, local disc usage increased from 5G to 20G and
keeps increasing. From what I googled, once I introduced `windowedBy`,
it should remove old data automatically.

My topology looks like below:

stream.selectKey(selectKey A)
.groupByKey(..)
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
.reduce((value1,value2) -> value2)
.suppress.toStreams()
.selectKey(selectKey B).mapValues().filter()
.groupByKey().reduce.toStream().to()

One thing I can't understand is, from this topology, it will create
two internal repartition topics, as repartition-03 and repartition-14
for two groupBy actions. From the disc, all machines which are taking
repartition-03 tasks are having high disc usage and seems never
removing old data while machines which are running repartition-14
tasks are always under low disc usage.

When I log in to the machines, I found different path for those two
machines as below:

/tmp/kafka-streams/test-group/2_40/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000014

/tmp/kafka-streams/test-group/1_4/KSTREAM-REDUCE-STATE-STORE-0000000003/KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000

Why they are having different path? 2_40 is one of the repartition-14
tasks and it has rocksdb in the path while the other doesn't contain
rocksdb. Meanwhile, taks 1_4 keeps couple folders like
KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000 but with different
suffix.

I though once I introduced windowedBy function, rocksdb will remove
old data when window is expired? And why above two internal
repartition topics have different path and retention behavior?

Any help is highly appreciated! Thanks!

Re: Kafka Streams Rocksdb retention didn't remove old data with windowed function

Posted by "Matthias J. Sax" <ma...@confluent.io>.
The problem is, that the first `reduce()` is using a windowed-store and
applies a retention time to expire old windows.

However, the second `reduce()` is _no_ windowed. Windowed-aggregations
over KTables (the result of the first aggregation is a KTable) are
currently not supported. Therefore, the second `reduce()` used a plain
key-value store and if your key-space grows unbounded, the store grows
unbounded.


-Matthias

On 9/19/19 3:48 PM, Xiyuan Hu wrote:
> Hi,
> 
> I'm running a Kafka streams app(v2.1.0) with windowed function. But
> after 24 hours running, local disc usage increased from 5G to 20G and
> keeps increasing. From what I googled, once I introduced `windowedBy`,
> it should remove old data automatically.
> 
> My topology looks like below:
> 
> stream.selectKey(selectKey A)
> .groupByKey(..)
> .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> .reduce((value1,value2) -> value2)
> .suppress.toStreams()
> .selectKey(selectKey B).mapValues().filter()
> .groupByKey().reduce.toStream().to()
> 
> One thing I can't understand is, from this topology, it will create
> two internal repartition topics, as repartition-03 and repartition-14
> for two groupBy actions. From the disc, all machines which are taking
> repartition-03 tasks are having high disc usage and seems never
> removing old data while machines which are running repartition-14
> tasks are always under low disc usage.
> 
> When I log in to the machines, I found different path for those two
> machines as below:
> 
> /tmp/kafka-streams/test-group/2_40/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000014
> 
> /tmp/kafka-streams/test-group/1_4/KSTREAM-REDUCE-STATE-STORE-0000000003/KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000
> 
> Why they are having different path? 2_40 is one of the repartition-14
> tasks and it has rocksdb in the path while the other doesn't contain
> rocksdb. Meanwhile, taks 1_4 keeps couple folders like
> KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000 but with different
> suffix.
> 
> I though once I introduced windowedBy function, rocksdb will remove
> old data when window is expired? And why above two internal
> repartition topics have different path and retention behavior?
> 
> Any help is highly appreciated! Thanks!
>