You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Björn Häuser <bj...@gmail.com> on 2018/03/28 10:23:06 UTC

Kafka Streams Internal Topic Retention not applied

Hello Everyone,

we are running a Kafka Streams Application with does time window aggregates (using kafka 1.0.0).

Unfortunately one of the changelog topics is now growing quite a bit in size maxing out the brokers. I did not find any settings in the kafka stream properties to configure retention and went ahead and set it retention.bytes to 15gb. Unfortunately this does not seem to apply and the topic size is still around 140gb.

Is this intended? I could not find any documentation about setting the retention size for the internal topics.

Thanks
Björn



Re: Kafka Streams Internal Topic Retention not applied

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Björn,

Couple of answers:

>> So, a streams internal topic for aggregation will be of cleanup.policy = compact.

Yes. (for non-windowed aggregation)

However, in your case, you are using a windowed aggregation, and there
the policy is "compact,delete". Because for window aggregation, the key
space is not bounded, the changelog topic would grow unbounded if there
is no retention time (the written key is a combined key for the
message-key plus window start time). Thus, windows will be maintained
for 1 day (by default) until those records are deleted from the
changelog topic (even if there is no tombstone message written).

>> Which means that while doing aggregation ignoring tombstones records will cause havoc, right?

If your key space is unbounded, yes. For a bounded key space, the size
of the topic is limited by the number of unique keys -- thus, even if no
tombstones are written, the topic will not grow.

>> Which means we have to remove the last line so that tombstone records are pass through and topic compaction can kick in?

Yes.

>> And tombstones records will be deleted after "delete.retention.ms”, right? 

Yes.

>> Which defaults to 24 hours - meaning that the internal topic should only contain data for 24 hours + window Size? Is this somehow right?

No exactly. Note, that KS does not delete windows when window end time
passes (windows don't close in KS). This is required to handle late
arriving data. Thus, you can have multiple windows per key and KS also
applies a retention time for how long to maintain windows before they
are dropped (by default, also 24h). You can configure the window
retention time via `.until()`.

>> Again, thank you very much for taking the time to answer these questions, i am feeling a bit stupid here right now :(

No need to. It's complex...


-Matthias

On 4/7/18 1:38 AM, Björn Häuser wrote:
> Hello Matthias,
> 
> thank you very much for your patience.
> 
> I am still trying to understand the complete picture here.
> 
> So, a streams internal topic for aggregation will be of cleanup.policy = compact.
> 
> Which means that while doing aggregation ignoring tombstones records will cause havoc, right?
> 
> This is more or less our streams setup: 
> 
> https://gist.github.com/bjoernhaeuser/be1ffb06e5e970a50a5ffb309ded4b6b <https://gist.github.com/bjoernhaeuser/be1ffb06e5e970a50a5ffb309ded4b6b>
> 
> Which means we have to remove the last line so that tombstone records are pass through and topic compaction can kick in?
> 
> And tombstones records will be deleted after "delete.retention.ms”, right? Which defaults to 24 hours - meaning that the internal topic should only contain data for 24 hours + window Size? Is this somehow right?
> 
> Again, thank you very much for taking the time to answer these questions, i am feeling a bit stupid here right now :(
> 
> Björn
> 
>> On 7. Apr 2018, at 00:45, Matthias J. Sax <ma...@confluent.io> wrote:
>>
>> Björn,
>>
>> broker configs are default config but can be overwritten when a topic is
>> created. And this happens when Kafka Streams creates internal topics.
>> Thus, you need to change the setting Kafka Streams applies when creating
>> topics.
>>
>> Also note: if cleanup.policy = compact, the setting of `log.retention.X`
>> do not apply. Those setting only apply if cleanup.policy is set to "delete".
>>
>> The size of a compacted topic depends on the number of unique keys --
>> there will be one message per key -- if newer message with the same key
>> are written, older message with this key can be garbage collected. If a
>> message is never updated (and not explicitly deleted with a tomstone
>> records, ie, a record with null-value), the record will never be deleted
>> for a compacted topic.
>>
>>
>> -Matthias
>>
>> On 4/6/18 2:10 PM, Björn Häuser wrote:
>>> Hello Guozhang
>>>
>>> thanks.
>>>
>>> So after reading much more docs I still do not have the complete picture.
>>>
>>> These are our relevant settings from kafka broker configuration:
>>>
>>> log.cleanup.policy=delete
>>> # set log.retention.bytes to 15 gb
>>> log.retention.bytes=16106127360
>>> # set log.retention.hours to 30 days
>>> log.retention.hours=720
>>>
>>> Though one of internal kafka stream topics (with cleanup.policy = compact) grew to ~40gb today. 
>>>
>>> What am I missing? I thought these settings should take care that a given topic is never >15 gb, right?
>>>
>>> Thanks
>>> Björn
>>>
>>>> On 29. Mar 2018, at 00:05, Guozhang Wang <wa...@gmail.com> wrote:
>>>>
>>>> Hello,
>>>>
>>>> You can set the topic-level configs via the
>>>> StreamsConfig#topicPrefix(String), please find the following web docs
>>>> (search for KIP-173):
>>>>
>>>> https://kafka.apache.org/documentation/streams/upgrade-guide#streams_api_changes_100
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>>
>>>> On Wed, Mar 28, 2018 at 3:23 AM, Björn Häuser <bj...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello Everyone,
>>>>>
>>>>> we are running a Kafka Streams Application with does time window
>>>>> aggregates (using kafka 1.0.0).
>>>>>
>>>>> Unfortunately one of the changelog topics is now growing quite a bit in
>>>>> size maxing out the brokers. I did not find any settings in the kafka
>>>>> stream properties to configure retention and went ahead and set it
>>>>> retention.bytes to 15gb. Unfortunately this does not seem to apply and the
>>>>> topic size is still around 140gb.
>>>>>
>>>>> Is this intended? I could not find any documentation about setting the
>>>>> retention size for the internal topics.
>>>>>
>>>>> Thanks
>>>>> Björn
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> -- 
>>>> -- Guozhang
>>>
>>
> 
> 


Re: Kafka Streams Internal Topic Retention not applied

Posted by Björn Häuser <bj...@gmail.com>.
Hello Matthias,

thank you very much for your patience.

I am still trying to understand the complete picture here.

So, a streams internal topic for aggregation will be of cleanup.policy = compact.

Which means that while doing aggregation ignoring tombstones records will cause havoc, right?

This is more or less our streams setup: 

https://gist.github.com/bjoernhaeuser/be1ffb06e5e970a50a5ffb309ded4b6b <https://gist.github.com/bjoernhaeuser/be1ffb06e5e970a50a5ffb309ded4b6b>

Which means we have to remove the last line so that tombstone records are pass through and topic compaction can kick in?

And tombstones records will be deleted after "delete.retention.ms”, right? Which defaults to 24 hours - meaning that the internal topic should only contain data for 24 hours + window Size? Is this somehow right?

Again, thank you very much for taking the time to answer these questions, i am feeling a bit stupid here right now :(

Björn

> On 7. Apr 2018, at 00:45, Matthias J. Sax <ma...@confluent.io> wrote:
> 
> Björn,
> 
> broker configs are default config but can be overwritten when a topic is
> created. And this happens when Kafka Streams creates internal topics.
> Thus, you need to change the setting Kafka Streams applies when creating
> topics.
> 
> Also note: if cleanup.policy = compact, the setting of `log.retention.X`
> do not apply. Those setting only apply if cleanup.policy is set to "delete".
> 
> The size of a compacted topic depends on the number of unique keys --
> there will be one message per key -- if newer message with the same key
> are written, older message with this key can be garbage collected. If a
> message is never updated (and not explicitly deleted with a tomstone
> records, ie, a record with null-value), the record will never be deleted
> for a compacted topic.
> 
> 
> -Matthias
> 
> On 4/6/18 2:10 PM, Björn Häuser wrote:
>> Hello Guozhang
>> 
>> thanks.
>> 
>> So after reading much more docs I still do not have the complete picture.
>> 
>> These are our relevant settings from kafka broker configuration:
>> 
>> log.cleanup.policy=delete
>> # set log.retention.bytes to 15 gb
>> log.retention.bytes=16106127360
>> # set log.retention.hours to 30 days
>> log.retention.hours=720
>> 
>> Though one of internal kafka stream topics (with cleanup.policy = compact) grew to ~40gb today. 
>> 
>> What am I missing? I thought these settings should take care that a given topic is never >15 gb, right?
>> 
>> Thanks
>> Björn
>> 
>>> On 29. Mar 2018, at 00:05, Guozhang Wang <wa...@gmail.com> wrote:
>>> 
>>> Hello,
>>> 
>>> You can set the topic-level configs via the
>>> StreamsConfig#topicPrefix(String), please find the following web docs
>>> (search for KIP-173):
>>> 
>>> https://kafka.apache.org/documentation/streams/upgrade-guide#streams_api_changes_100
>>> 
>>> 
>>> Guozhang
>>> 
>>> 
>>> 
>>> On Wed, Mar 28, 2018 at 3:23 AM, Björn Häuser <bj...@gmail.com>
>>> wrote:
>>> 
>>>> Hello Everyone,
>>>> 
>>>> we are running a Kafka Streams Application with does time window
>>>> aggregates (using kafka 1.0.0).
>>>> 
>>>> Unfortunately one of the changelog topics is now growing quite a bit in
>>>> size maxing out the brokers. I did not find any settings in the kafka
>>>> stream properties to configure retention and went ahead and set it
>>>> retention.bytes to 15gb. Unfortunately this does not seem to apply and the
>>>> topic size is still around 140gb.
>>>> 
>>>> Is this intended? I could not find any documentation about setting the
>>>> retention size for the internal topics.
>>>> 
>>>> Thanks
>>>> Björn
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> -- 
>>> -- Guozhang
>> 
> 


Re: Kafka Streams Internal Topic Retention not applied

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Björn,

broker configs are default config but can be overwritten when a topic is
created. And this happens when Kafka Streams creates internal topics.
Thus, you need to change the setting Kafka Streams applies when creating
topics.

Also note: if cleanup.policy = compact, the setting of `log.retention.X`
do not apply. Those setting only apply if cleanup.policy is set to "delete".

The size of a compacted topic depends on the number of unique keys --
there will be one message per key -- if newer message with the same key
are written, older message with this key can be garbage collected. If a
message is never updated (and not explicitly deleted with a tomstone
records, ie, a record with null-value), the record will never be deleted
for a compacted topic.


-Matthias

On 4/6/18 2:10 PM, Björn Häuser wrote:
> Hello Guozhang
> 
> thanks.
> 
> So after reading much more docs I still do not have the complete picture.
> 
> These are our relevant settings from kafka broker configuration:
> 
> log.cleanup.policy=delete
> # set log.retention.bytes to 15 gb
> log.retention.bytes=16106127360
> # set log.retention.hours to 30 days
> log.retention.hours=720
> 
> Though one of internal kafka stream topics (with cleanup.policy = compact) grew to ~40gb today. 
> 
> What am I missing? I thought these settings should take care that a given topic is never >15 gb, right?
> 
> Thanks
> Björn
> 
>> On 29. Mar 2018, at 00:05, Guozhang Wang <wa...@gmail.com> wrote:
>>
>> Hello,
>>
>> You can set the topic-level configs via the
>> StreamsConfig#topicPrefix(String), please find the following web docs
>> (search for KIP-173):
>>
>> https://kafka.apache.org/documentation/streams/upgrade-guide#streams_api_changes_100
>>
>>
>> Guozhang
>>
>>
>>
>> On Wed, Mar 28, 2018 at 3:23 AM, Björn Häuser <bj...@gmail.com>
>> wrote:
>>
>>> Hello Everyone,
>>>
>>> we are running a Kafka Streams Application with does time window
>>> aggregates (using kafka 1.0.0).
>>>
>>> Unfortunately one of the changelog topics is now growing quite a bit in
>>> size maxing out the brokers. I did not find any settings in the kafka
>>> stream properties to configure retention and went ahead and set it
>>> retention.bytes to 15gb. Unfortunately this does not seem to apply and the
>>> topic size is still around 140gb.
>>>
>>> Is this intended? I could not find any documentation about setting the
>>> retention size for the internal topics.
>>>
>>> Thanks
>>> Björn
>>>
>>>
>>>
>>
>>
>> -- 
>> -- Guozhang
> 


Re: Kafka Streams Internal Topic Retention not applied

Posted by Björn Häuser <bj...@gmail.com>.
Hello Guozhang

thanks.

So after reading much more docs I still do not have the complete picture.

These are our relevant settings from kafka broker configuration:

log.cleanup.policy=delete
# set log.retention.bytes to 15 gb
log.retention.bytes=16106127360
# set log.retention.hours to 30 days
log.retention.hours=720

Though one of internal kafka stream topics (with cleanup.policy = compact) grew to ~40gb today. 

What am I missing? I thought these settings should take care that a given topic is never >15 gb, right?

Thanks
Björn

> On 29. Mar 2018, at 00:05, Guozhang Wang <wa...@gmail.com> wrote:
> 
> Hello,
> 
> You can set the topic-level configs via the
> StreamsConfig#topicPrefix(String), please find the following web docs
> (search for KIP-173):
> 
> https://kafka.apache.org/documentation/streams/upgrade-guide#streams_api_changes_100
> 
> 
> Guozhang
> 
> 
> 
> On Wed, Mar 28, 2018 at 3:23 AM, Björn Häuser <bj...@gmail.com>
> wrote:
> 
>> Hello Everyone,
>> 
>> we are running a Kafka Streams Application with does time window
>> aggregates (using kafka 1.0.0).
>> 
>> Unfortunately one of the changelog topics is now growing quite a bit in
>> size maxing out the brokers. I did not find any settings in the kafka
>> stream properties to configure retention and went ahead and set it
>> retention.bytes to 15gb. Unfortunately this does not seem to apply and the
>> topic size is still around 140gb.
>> 
>> Is this intended? I could not find any documentation about setting the
>> retention size for the internal topics.
>> 
>> Thanks
>> Björn
>> 
>> 
>> 
> 
> 
> -- 
> -- Guozhang


Re: Kafka Streams Internal Topic Retention not applied

Posted by Guozhang Wang <wa...@gmail.com>.
Hello,

You can set the topic-level configs via the
StreamsConfig#topicPrefix(String), please find the following web docs
(search for KIP-173):

https://kafka.apache.org/documentation/streams/upgrade-guide#streams_api_changes_100


Guozhang



On Wed, Mar 28, 2018 at 3:23 AM, Björn Häuser <bj...@gmail.com>
wrote:

> Hello Everyone,
>
> we are running a Kafka Streams Application with does time window
> aggregates (using kafka 1.0.0).
>
> Unfortunately one of the changelog topics is now growing quite a bit in
> size maxing out the brokers. I did not find any settings in the kafka
> stream properties to configure retention and went ahead and set it
> retention.bytes to 15gb. Unfortunately this does not seem to apply and the
> topic size is still around 140gb.
>
> Is this intended? I could not find any documentation about setting the
> retention size for the internal topics.
>
> Thanks
> Björn
>
>
>


-- 
-- Guozhang