You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Martin Kleppmann <ma...@kleppmann.com> on 2018/01/29 15:44:45 UTC

Log segment deletion

Hi all,

We are debugging an issue with a Kafka Streams application that is producing incorrect output. The application is a simple group-by on a key, and then count. As expected, the application creates a repartitioning topic for the group-by stage. The problem appears to be that messages are getting lost in the repartitioning topic.

Looking at the Kafka broker logs, it appears that the log segments for the repartitioning topic are getting marked for deletion very aggressively (within ~2 seconds of being created), so fast that some segments are deleted before the count stage of the Kafka Streams application has had a chance to consume the messages.

I have checked the configuration and I cannot see a reason why the log segments should be getting deleted so quickly. The following line reports the creation of the repartitioning topic:

[2018-01-29 15:31:39,992] INFO Created log for partition [streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition,0] in /kafka-data with properties {compression.type -> producer, message.format.version -> 0.11.0-IV2, file.delete.delay.ms -> 100000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> 1073741824, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms -> 9223372036854775807, segment.ms -> 3600000, segment.bytes -> 1073741824, retention.ms -> 86400000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)

As you can see, retention is set to 24 hours, retention size 1 GB, segment rolling time to 1 hour, segment size 1 GB. For test purposes we are running the Streams app on a fixed input of 7,000 messages, with a total size of only about 5.5 MB, so we shouldn't be getting anywhere near the segment or retention limits. The input topic has only one partition.

Just two seconds after the topic is created, the broker reports that it is rolling log segments and scheduling old log segments for deletion:

[2018-01-29 15:31:41,923] INFO Rolled new log segment for 'streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0' in 1 ms. (kafka.log.Log)
[2018-01-29 15:31:41,924] INFO Scheduling log segment 0 for log streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0 for deletion. (kafka.log.Log)
[2018-01-29 15:31:41,945] INFO Cleared earliest 0 entries from epoch cache based on passed offset 6582 leaving 1 in EpochFile for partition streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0 (kafka.server.epoch.LeaderEpochFileCache)
[2018-01-29 15:31:42,923] INFO Rolled new log segment for 'streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0' in 2 ms. (kafka.log.Log)
[2018-01-29 15:31:42,924] INFO Scheduling log segment 6582 for log streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0 for deletion. (kafka.log.Log)

100 seconds later (consistent with the setting for file.delete.delay.ms), the files are actually deleted:

[2018-01-29 15:33:21,923] INFO Deleting segment 0 from log streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0. (kafka.log.Log)
[2018-01-29 15:33:21,929] INFO Deleting index /kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
[2018-01-29 15:33:21,929] INFO Deleting index /kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000000000.timeindex.deleted (kafka.log.TimeIndex)
[2018-01-29 15:33:22,925] INFO Deleting segment 6582 from log streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0. (kafka.log.Log)
[2018-01-29 15:33:22,926] INFO Deleting index /kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000006582.index.deleted (kafka.log.OffsetIndex)
[2018-01-29 15:33:22,927] INFO Deleting index /kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000006582.timeindex.deleted (kafka.log.TimeIndex)

Does anyone know what might be causing the messages in the repartitioning topic to be deleted so aggressively?

Thanks,
Martin



Re: Log segment deletion

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Martin,

That is a good point. In fact in the coming release we have made such
repartition topics really "transient" by periodically purging it with the
embedded admin client, so we can actually set its retention to -1:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier


For non-internal topics, like the sink topics, though, you still need to
manually set the configs to allow old records to be appended.

Guozhang


On Tue, Jan 30, 2018 at 11:57 AM, Martin Kleppmann <ma...@kleppmann.com>
wrote:

> Hi Guozhang,
>
> Thanks very much for your reply. I am inclined to consider this a bug,
> since Kafka Streams in the default configuration is likely to run into this
> problem while reprocessing old messages, and in most cases the problem
> wouldn't be noticed (since there is no error -- the job just produces
> incorrect output).
>
> The repartitioning topics are already created with a config of
> cleanup.policy=delete, regardless of the brokers' default config. Would it
> make sense for Kafka Streams to also set a config of retention.ms=-1 or
> message.timestamp.type=LogAppendTime on repartitioning topics when they
> are created? However, neither setting is ideal (if time-based retention is
> set to infinite, retention.bytes needs to be configured instead; if
> LogAppendTime is used, the original message timestamps are lost, which may
> break windowing functions). Or maybe Kafka Streams can throw an exception
> if it processes messages that are older than the retention period, to
> ensure that the developer notices the problem, rather than having messages
> silently dropped?
>
> Best,
> Martin
>
> > On 29 Jan 2018, at 18:01, Guozhang Wang <wa...@gmail.com> wrote:
> >
> > Hello Martin,
> >
> > What you've observed is correct. More generally speaking, for various
> > broker-side operations that based on record timestamps and treating them
> as
> > wall-clock time, there is a mismatch between the stream records'
> timestamp
> > which is basically "event time", against the broker's system wall-clock
> > time (i.e. when the events gets ingested into Kafka v.s. the events
> > happened).
> >
> > For you example, when brokers decide when to roll a new segment or delete
> > an old segment, they are effectively comparing the record timestamp with
> > the system "wall-clock time". This has a bad effect for re-processing and
> > bootstrapping scenarios (thinking: processing a record whose timestamp is
> > from a week ago and then trying to send it to the intermediate topic with
> > system time "NOW").
> >
> > We are actively discussing on how to close this gap. As of now, your
> > walk-around solution looks good to me, or you can also consider setting
> the
> > broker config *"log.message.timestamp.difference.max.ms
> > <http://log.message.timestamp.difference.max.ms>"* to very long values.
> >
> >
> > Guozhang
> >
> >
> > On Mon, Jan 29, 2018 at 8:23 AM, Martin Kleppmann <ma...@kleppmann.com>
> > wrote:
> >
> >> Follow-up: I think we figured out what was happening. Setting the broker
> >> config log.message.timestamp.type=LogAppendTime (instead of the default
> >> value CreateTime) stopped the messages disappearing.
> >>
> >> The messages in the Streams app's input topic are older than the 24
> hours
> >> default retention period. On this input topic, we have set an unlimited
> >> retention period. Am I right in thinking that in the setting
> >> message.timestamp.type=CreateTime, the input message timestamp is
> carried
> >> over to the messages in the repartitioning topic? And furthermore, the
> >> time-based retention of messages in the repartitioning topic is based on
> >> the message timestamps?
> >>
> >> If that is the case, then messages that are older than 24 hours are
> >> immediately scheduled for deletion as soon as they are copied over into
> the
> >> repartitioning topic. Thus, we have a race between the log cleaner and
> the
> >> consumer in the second stage of the streams app. :-(
> >>
> >> Is log.message.timestamp.type=LogAppendTime the best way of avoiding
> this
> >> problem?
> >>
> >> Thanks,
> >> Martin
> >>
> >>> On 29 Jan 2018, at 15:44, Martin Kleppmann <ma...@kleppmann.com>
> wrote:
> >>>
> >>> Hi all,
> >>>
> >>> We are debugging an issue with a Kafka Streams application that is
> >> producing incorrect output. The application is a simple group-by on a
> key,
> >> and then count. As expected, the application creates a repartitioning
> topic
> >> for the group-by stage. The problem appears to be that messages are
> getting
> >> lost in the repartitioning topic.
> >>>
> >>> Looking at the Kafka broker logs, it appears that the log segments for
> >> the repartitioning topic are getting marked for deletion very
> aggressively
> >> (within ~2 seconds of being created), so fast that some segments are
> >> deleted before the count stage of the Kafka Streams application has had
> a
> >> chance to consume the messages.
> >>>
> >>> I have checked the configuration and I cannot see a reason why the log
> >> segments should be getting deleted so quickly. The following line
> reports
> >> the creation of the repartitioning topic:
> >>>
> >>> [2018-01-29 15:31:39,992] INFO Created log for partition
> >> [streamsapp_site_stats-v290118-debug_1517239899448-
> >> localstore_log_event_counts-repartition,0] in /kafka-data with
> properties
> >> {compression.type -> producer, message.format.version -> 0.11.0-IV2,
> >> file.delete.delay.ms -> 100000, max.message.bytes -> 1000012,
> >> min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime,
> >> min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false,
> >> min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096,
> >> unclean.leader.election.enable -> false, retention.bytes -> 1073741824,
> >> delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms ->
> >> 9223372036854775807, segment.ms -> 3600000, segment.bytes ->
> 1073741824,
> >> retention.ms -> 86400000, message.timestamp.difference.max.ms ->
> >> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages ->
> >> 9223372036854775807}. (kafka.log.LogManager)
> >>>
> >>> As you can see, retention is set to 24 hours, retention size 1 GB,
> >> segment rolling time to 1 hour, segment size 1 GB. For test purposes we
> are
> >> running the Streams app on a fixed input of 7,000 messages, with a total
> >> size of only about 5.5 MB, so we shouldn't be getting anywhere near the
> >> segment or retention limits. The input topic has only one partition.
> >>>
> >>> Just two seconds after the topic is created, the broker reports that it
> >> is rolling log segments and scheduling old log segments for deletion:
> >>>
> >>> [2018-01-29 15:31:41,923] INFO Rolled new log segment for
> >> 'streamsapp_site_stats-v290118-debug_1517239899448-
> >> localstore_log_event_counts-repartition-0' in 1 ms. (kafka.log.Log)
> >>> [2018-01-29 15:31:41,924] INFO Scheduling log segment 0 for log
> >> streamsapp_site_stats-v290118-debug_1517239899448-
> >> localstore_log_event_counts-repartition-0 for deletion. (kafka.log.Log)
> >>> [2018-01-29 15:31:41,945] INFO Cleared earliest 0 entries from epoch
> >> cache based on passed offset 6582 leaving 1 in EpochFile for partition
> >> streamsapp_site_stats-v290118-debug_1517239899448-
> >> localstore_log_event_counts-repartition-0 (kafka.server.epoch.
> >> LeaderEpochFileCache)
> >>> [2018-01-29 15:31:42,923] INFO Rolled new log segment for
> >> 'streamsapp_site_stats-v290118-debug_1517239899448-
> >> localstore_log_event_counts-repartition-0' in 2 ms. (kafka.log.Log)
> >>> [2018-01-29 15:31:42,924] INFO Scheduling log segment 6582 for log
> >> streamsapp_site_stats-v290118-debug_1517239899448-
> >> localstore_log_event_counts-repartition-0 for deletion. (kafka.log.Log)
> >>>
> >>> 100 seconds later (consistent with the setting for
> file.delete.delay.ms),
> >> the files are actually deleted:
> >>>
> >>> [2018-01-29 15:33:21,923] INFO Deleting segment 0 from log
> >> streamsapp_site_stats-v290118-debug_1517239899448-
> >> localstore_log_event_counts-repartition-0. (kafka.log.Log)
> >>> [2018-01-29 15:33:21,929] INFO Deleting index
> >> /kafka-data/streamsapp_site_stats-v290118-debug_
> >> 1517239899448-localstore_log_event_counts-repartition-0/
> >> 00000000000000000000.index.deleted (kafka.log.OffsetIndex)
> >>> [2018-01-29 15:33:21,929] INFO Deleting index
> >> /kafka-data/streamsapp_site_stats-v290118-debug_
> >> 1517239899448-localstore_log_event_counts-repartition-0/
> >> 00000000000000000000.timeindex.deleted (kafka.log.TimeIndex)
> >>> [2018-01-29 15:33:22,925] INFO Deleting segment 6582 from log
> >> streamsapp_site_stats-v290118-debug_1517239899448-
> >> localstore_log_event_counts-repartition-0. (kafka.log.Log)
> >>> [2018-01-29 15:33:22,926] INFO Deleting index
> >> /kafka-data/streamsapp_site_stats-v290118-debug_
> >> 1517239899448-localstore_log_event_counts-repartition-0/
> >> 00000000000000006582.index.deleted (kafka.log.OffsetIndex)
> >>> [2018-01-29 15:33:22,927] INFO Deleting index
> >> /kafka-data/streamsapp_site_stats-v290118-debug_
> >> 1517239899448-localstore_log_event_counts-repartition-0/
> >> 00000000000000006582.timeindex.deleted (kafka.log.TimeIndex)
> >>>
> >>> Does anyone know what might be causing the messages in the
> >> repartitioning topic to be deleted so aggressively?
> >>>
> >>> Thanks,
> >>> Martin
> >>>
> >>>
> >>
> >>
> >
> >
> > --
> > -- Guozhang
>
>


-- 
-- Guozhang

Re: Log segment deletion

Posted by Martin Kleppmann <ma...@kleppmann.com>.
Hi Guozhang,

Thanks very much for your reply. I am inclined to consider this a bug, since Kafka Streams in the default configuration is likely to run into this problem while reprocessing old messages, and in most cases the problem wouldn't be noticed (since there is no error -- the job just produces incorrect output).

The repartitioning topics are already created with a config of cleanup.policy=delete, regardless of the brokers' default config. Would it make sense for Kafka Streams to also set a config of retention.ms=-1 or message.timestamp.type=LogAppendTime on repartitioning topics when they are created? However, neither setting is ideal (if time-based retention is set to infinite, retention.bytes needs to be configured instead; if LogAppendTime is used, the original message timestamps are lost, which may break windowing functions). Or maybe Kafka Streams can throw an exception if it processes messages that are older than the retention period, to ensure that the developer notices the problem, rather than having messages silently dropped?

Best,
Martin

> On 29 Jan 2018, at 18:01, Guozhang Wang <wa...@gmail.com> wrote:
> 
> Hello Martin,
> 
> What you've observed is correct. More generally speaking, for various
> broker-side operations that based on record timestamps and treating them as
> wall-clock time, there is a mismatch between the stream records' timestamp
> which is basically "event time", against the broker's system wall-clock
> time (i.e. when the events gets ingested into Kafka v.s. the events
> happened).
> 
> For you example, when brokers decide when to roll a new segment or delete
> an old segment, they are effectively comparing the record timestamp with
> the system "wall-clock time". This has a bad effect for re-processing and
> bootstrapping scenarios (thinking: processing a record whose timestamp is
> from a week ago and then trying to send it to the intermediate topic with
> system time "NOW").
> 
> We are actively discussing on how to close this gap. As of now, your
> walk-around solution looks good to me, or you can also consider setting the
> broker config *"log.message.timestamp.difference.max.ms
> <http://log.message.timestamp.difference.max.ms>"* to very long values.
> 
> 
> Guozhang
> 
> 
> On Mon, Jan 29, 2018 at 8:23 AM, Martin Kleppmann <ma...@kleppmann.com>
> wrote:
> 
>> Follow-up: I think we figured out what was happening. Setting the broker
>> config log.message.timestamp.type=LogAppendTime (instead of the default
>> value CreateTime) stopped the messages disappearing.
>> 
>> The messages in the Streams app's input topic are older than the 24 hours
>> default retention period. On this input topic, we have set an unlimited
>> retention period. Am I right in thinking that in the setting
>> message.timestamp.type=CreateTime, the input message timestamp is carried
>> over to the messages in the repartitioning topic? And furthermore, the
>> time-based retention of messages in the repartitioning topic is based on
>> the message timestamps?
>> 
>> If that is the case, then messages that are older than 24 hours are
>> immediately scheduled for deletion as soon as they are copied over into the
>> repartitioning topic. Thus, we have a race between the log cleaner and the
>> consumer in the second stage of the streams app. :-(
>> 
>> Is log.message.timestamp.type=LogAppendTime the best way of avoiding this
>> problem?
>> 
>> Thanks,
>> Martin
>> 
>>> On 29 Jan 2018, at 15:44, Martin Kleppmann <ma...@kleppmann.com> wrote:
>>> 
>>> Hi all,
>>> 
>>> We are debugging an issue with a Kafka Streams application that is
>> producing incorrect output. The application is a simple group-by on a key,
>> and then count. As expected, the application creates a repartitioning topic
>> for the group-by stage. The problem appears to be that messages are getting
>> lost in the repartitioning topic.
>>> 
>>> Looking at the Kafka broker logs, it appears that the log segments for
>> the repartitioning topic are getting marked for deletion very aggressively
>> (within ~2 seconds of being created), so fast that some segments are
>> deleted before the count stage of the Kafka Streams application has had a
>> chance to consume the messages.
>>> 
>>> I have checked the configuration and I cannot see a reason why the log
>> segments should be getting deleted so quickly. The following line reports
>> the creation of the repartitioning topic:
>>> 
>>> [2018-01-29 15:31:39,992] INFO Created log for partition
>> [streamsapp_site_stats-v290118-debug_1517239899448-
>> localstore_log_event_counts-repartition,0] in /kafka-data with properties
>> {compression.type -> producer, message.format.version -> 0.11.0-IV2,
>> file.delete.delay.ms -> 100000, max.message.bytes -> 1000012,
>> min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime,
>> min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false,
>> min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096,
>> unclean.leader.election.enable -> false, retention.bytes -> 1073741824,
>> delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms ->
>> 9223372036854775807, segment.ms -> 3600000, segment.bytes -> 1073741824,
>> retention.ms -> 86400000, message.timestamp.difference.max.ms ->
>> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages ->
>> 9223372036854775807}. (kafka.log.LogManager)
>>> 
>>> As you can see, retention is set to 24 hours, retention size 1 GB,
>> segment rolling time to 1 hour, segment size 1 GB. For test purposes we are
>> running the Streams app on a fixed input of 7,000 messages, with a total
>> size of only about 5.5 MB, so we shouldn't be getting anywhere near the
>> segment or retention limits. The input topic has only one partition.
>>> 
>>> Just two seconds after the topic is created, the broker reports that it
>> is rolling log segments and scheduling old log segments for deletion:
>>> 
>>> [2018-01-29 15:31:41,923] INFO Rolled new log segment for
>> 'streamsapp_site_stats-v290118-debug_1517239899448-
>> localstore_log_event_counts-repartition-0' in 1 ms. (kafka.log.Log)
>>> [2018-01-29 15:31:41,924] INFO Scheduling log segment 0 for log
>> streamsapp_site_stats-v290118-debug_1517239899448-
>> localstore_log_event_counts-repartition-0 for deletion. (kafka.log.Log)
>>> [2018-01-29 15:31:41,945] INFO Cleared earliest 0 entries from epoch
>> cache based on passed offset 6582 leaving 1 in EpochFile for partition
>> streamsapp_site_stats-v290118-debug_1517239899448-
>> localstore_log_event_counts-repartition-0 (kafka.server.epoch.
>> LeaderEpochFileCache)
>>> [2018-01-29 15:31:42,923] INFO Rolled new log segment for
>> 'streamsapp_site_stats-v290118-debug_1517239899448-
>> localstore_log_event_counts-repartition-0' in 2 ms. (kafka.log.Log)
>>> [2018-01-29 15:31:42,924] INFO Scheduling log segment 6582 for log
>> streamsapp_site_stats-v290118-debug_1517239899448-
>> localstore_log_event_counts-repartition-0 for deletion. (kafka.log.Log)
>>> 
>>> 100 seconds later (consistent with the setting for file.delete.delay.ms),
>> the files are actually deleted:
>>> 
>>> [2018-01-29 15:33:21,923] INFO Deleting segment 0 from log
>> streamsapp_site_stats-v290118-debug_1517239899448-
>> localstore_log_event_counts-repartition-0. (kafka.log.Log)
>>> [2018-01-29 15:33:21,929] INFO Deleting index
>> /kafka-data/streamsapp_site_stats-v290118-debug_
>> 1517239899448-localstore_log_event_counts-repartition-0/
>> 00000000000000000000.index.deleted (kafka.log.OffsetIndex)
>>> [2018-01-29 15:33:21,929] INFO Deleting index
>> /kafka-data/streamsapp_site_stats-v290118-debug_
>> 1517239899448-localstore_log_event_counts-repartition-0/
>> 00000000000000000000.timeindex.deleted (kafka.log.TimeIndex)
>>> [2018-01-29 15:33:22,925] INFO Deleting segment 6582 from log
>> streamsapp_site_stats-v290118-debug_1517239899448-
>> localstore_log_event_counts-repartition-0. (kafka.log.Log)
>>> [2018-01-29 15:33:22,926] INFO Deleting index
>> /kafka-data/streamsapp_site_stats-v290118-debug_
>> 1517239899448-localstore_log_event_counts-repartition-0/
>> 00000000000000006582.index.deleted (kafka.log.OffsetIndex)
>>> [2018-01-29 15:33:22,927] INFO Deleting index
>> /kafka-data/streamsapp_site_stats-v290118-debug_
>> 1517239899448-localstore_log_event_counts-repartition-0/
>> 00000000000000006582.timeindex.deleted (kafka.log.TimeIndex)
>>> 
>>> Does anyone know what might be causing the messages in the
>> repartitioning topic to be deleted so aggressively?
>>> 
>>> Thanks,
>>> Martin
>>> 
>>> 
>> 
>> 
> 
> 
> -- 
> -- Guozhang


Re: Log segment deletion

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Martin,

What you've observed is correct. More generally speaking, for various
broker-side operations that based on record timestamps and treating them as
wall-clock time, there is a mismatch between the stream records' timestamp
which is basically "event time", against the broker's system wall-clock
time (i.e. when the events gets ingested into Kafka v.s. the events
happened).

For you example, when brokers decide when to roll a new segment or delete
an old segment, they are effectively comparing the record timestamp with
the system "wall-clock time". This has a bad effect for re-processing and
bootstrapping scenarios (thinking: processing a record whose timestamp is
from a week ago and then trying to send it to the intermediate topic with
system time "NOW").

We are actively discussing on how to close this gap. As of now, your
walk-around solution looks good to me, or you can also consider setting the
broker config *"log.message.timestamp.difference.max.ms
<http://log.message.timestamp.difference.max.ms>"* to very long values.


Guozhang


On Mon, Jan 29, 2018 at 8:23 AM, Martin Kleppmann <ma...@kleppmann.com>
wrote:

> Follow-up: I think we figured out what was happening. Setting the broker
> config log.message.timestamp.type=LogAppendTime (instead of the default
> value CreateTime) stopped the messages disappearing.
>
> The messages in the Streams app's input topic are older than the 24 hours
> default retention period. On this input topic, we have set an unlimited
> retention period. Am I right in thinking that in the setting
> message.timestamp.type=CreateTime, the input message timestamp is carried
> over to the messages in the repartitioning topic? And furthermore, the
> time-based retention of messages in the repartitioning topic is based on
> the message timestamps?
>
> If that is the case, then messages that are older than 24 hours are
> immediately scheduled for deletion as soon as they are copied over into the
> repartitioning topic. Thus, we have a race between the log cleaner and the
> consumer in the second stage of the streams app. :-(
>
> Is log.message.timestamp.type=LogAppendTime the best way of avoiding this
> problem?
>
> Thanks,
> Martin
>
> > On 29 Jan 2018, at 15:44, Martin Kleppmann <ma...@kleppmann.com> wrote:
> >
> > Hi all,
> >
> > We are debugging an issue with a Kafka Streams application that is
> producing incorrect output. The application is a simple group-by on a key,
> and then count. As expected, the application creates a repartitioning topic
> for the group-by stage. The problem appears to be that messages are getting
> lost in the repartitioning topic.
> >
> > Looking at the Kafka broker logs, it appears that the log segments for
> the repartitioning topic are getting marked for deletion very aggressively
> (within ~2 seconds of being created), so fast that some segments are
> deleted before the count stage of the Kafka Streams application has had a
> chance to consume the messages.
> >
> > I have checked the configuration and I cannot see a reason why the log
> segments should be getting deleted so quickly. The following line reports
> the creation of the repartitioning topic:
> >
> > [2018-01-29 15:31:39,992] INFO Created log for partition
> [streamsapp_site_stats-v290118-debug_1517239899448-
> localstore_log_event_counts-repartition,0] in /kafka-data with properties
> {compression.type -> producer, message.format.version -> 0.11.0-IV2,
> file.delete.delay.ms -> 100000, max.message.bytes -> 1000012,
> min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime,
> min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false,
> min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096,
> unclean.leader.election.enable -> false, retention.bytes -> 1073741824,
> delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms ->
> 9223372036854775807, segment.ms -> 3600000, segment.bytes -> 1073741824,
> retention.ms -> 86400000, message.timestamp.difference.max.ms ->
> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages ->
> 9223372036854775807}. (kafka.log.LogManager)
> >
> > As you can see, retention is set to 24 hours, retention size 1 GB,
> segment rolling time to 1 hour, segment size 1 GB. For test purposes we are
> running the Streams app on a fixed input of 7,000 messages, with a total
> size of only about 5.5 MB, so we shouldn't be getting anywhere near the
> segment or retention limits. The input topic has only one partition.
> >
> > Just two seconds after the topic is created, the broker reports that it
> is rolling log segments and scheduling old log segments for deletion:
> >
> > [2018-01-29 15:31:41,923] INFO Rolled new log segment for
> 'streamsapp_site_stats-v290118-debug_1517239899448-
> localstore_log_event_counts-repartition-0' in 1 ms. (kafka.log.Log)
> > [2018-01-29 15:31:41,924] INFO Scheduling log segment 0 for log
> streamsapp_site_stats-v290118-debug_1517239899448-
> localstore_log_event_counts-repartition-0 for deletion. (kafka.log.Log)
> > [2018-01-29 15:31:41,945] INFO Cleared earliest 0 entries from epoch
> cache based on passed offset 6582 leaving 1 in EpochFile for partition
> streamsapp_site_stats-v290118-debug_1517239899448-
> localstore_log_event_counts-repartition-0 (kafka.server.epoch.
> LeaderEpochFileCache)
> > [2018-01-29 15:31:42,923] INFO Rolled new log segment for
> 'streamsapp_site_stats-v290118-debug_1517239899448-
> localstore_log_event_counts-repartition-0' in 2 ms. (kafka.log.Log)
> > [2018-01-29 15:31:42,924] INFO Scheduling log segment 6582 for log
> streamsapp_site_stats-v290118-debug_1517239899448-
> localstore_log_event_counts-repartition-0 for deletion. (kafka.log.Log)
> >
> > 100 seconds later (consistent with the setting for file.delete.delay.ms),
> the files are actually deleted:
> >
> > [2018-01-29 15:33:21,923] INFO Deleting segment 0 from log
> streamsapp_site_stats-v290118-debug_1517239899448-
> localstore_log_event_counts-repartition-0. (kafka.log.Log)
> > [2018-01-29 15:33:21,929] INFO Deleting index
> /kafka-data/streamsapp_site_stats-v290118-debug_
> 1517239899448-localstore_log_event_counts-repartition-0/
> 00000000000000000000.index.deleted (kafka.log.OffsetIndex)
> > [2018-01-29 15:33:21,929] INFO Deleting index
> /kafka-data/streamsapp_site_stats-v290118-debug_
> 1517239899448-localstore_log_event_counts-repartition-0/
> 00000000000000000000.timeindex.deleted (kafka.log.TimeIndex)
> > [2018-01-29 15:33:22,925] INFO Deleting segment 6582 from log
> streamsapp_site_stats-v290118-debug_1517239899448-
> localstore_log_event_counts-repartition-0. (kafka.log.Log)
> > [2018-01-29 15:33:22,926] INFO Deleting index
> /kafka-data/streamsapp_site_stats-v290118-debug_
> 1517239899448-localstore_log_event_counts-repartition-0/
> 00000000000000006582.index.deleted (kafka.log.OffsetIndex)
> > [2018-01-29 15:33:22,927] INFO Deleting index
> /kafka-data/streamsapp_site_stats-v290118-debug_
> 1517239899448-localstore_log_event_counts-repartition-0/
> 00000000000000006582.timeindex.deleted (kafka.log.TimeIndex)
> >
> > Does anyone know what might be causing the messages in the
> repartitioning topic to be deleted so aggressively?
> >
> > Thanks,
> > Martin
> >
> >
>
>


-- 
-- Guozhang

Re: Log segment deletion

Posted by Martin Kleppmann <ma...@kleppmann.com>.
Follow-up: I think we figured out what was happening. Setting the broker config log.message.timestamp.type=LogAppendTime (instead of the default value CreateTime) stopped the messages disappearing.

The messages in the Streams app's input topic are older than the 24 hours default retention period. On this input topic, we have set an unlimited retention period. Am I right in thinking that in the setting message.timestamp.type=CreateTime, the input message timestamp is carried over to the messages in the repartitioning topic? And furthermore, the time-based retention of messages in the repartitioning topic is based on the message timestamps?

If that is the case, then messages that are older than 24 hours are immediately scheduled for deletion as soon as they are copied over into the repartitioning topic. Thus, we have a race between the log cleaner and the consumer in the second stage of the streams app. :-(

Is log.message.timestamp.type=LogAppendTime the best way of avoiding this problem?

Thanks,
Martin

> On 29 Jan 2018, at 15:44, Martin Kleppmann <ma...@kleppmann.com> wrote:
> 
> Hi all,
> 
> We are debugging an issue with a Kafka Streams application that is producing incorrect output. The application is a simple group-by on a key, and then count. As expected, the application creates a repartitioning topic for the group-by stage. The problem appears to be that messages are getting lost in the repartitioning topic.
> 
> Looking at the Kafka broker logs, it appears that the log segments for the repartitioning topic are getting marked for deletion very aggressively (within ~2 seconds of being created), so fast that some segments are deleted before the count stage of the Kafka Streams application has had a chance to consume the messages.
> 
> I have checked the configuration and I cannot see a reason why the log segments should be getting deleted so quickly. The following line reports the creation of the repartitioning topic:
> 
> [2018-01-29 15:31:39,992] INFO Created log for partition [streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition,0] in /kafka-data with properties {compression.type -> producer, message.format.version -> 0.11.0-IV2, file.delete.delay.ms -> 100000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> 1073741824, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms -> 9223372036854775807, segment.ms -> 3600000, segment.bytes -> 1073741824, retention.ms -> 86400000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
> 
> As you can see, retention is set to 24 hours, retention size 1 GB, segment rolling time to 1 hour, segment size 1 GB. For test purposes we are running the Streams app on a fixed input of 7,000 messages, with a total size of only about 5.5 MB, so we shouldn't be getting anywhere near the segment or retention limits. The input topic has only one partition.
> 
> Just two seconds after the topic is created, the broker reports that it is rolling log segments and scheduling old log segments for deletion:
> 
> [2018-01-29 15:31:41,923] INFO Rolled new log segment for 'streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0' in 1 ms. (kafka.log.Log)
> [2018-01-29 15:31:41,924] INFO Scheduling log segment 0 for log streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0 for deletion. (kafka.log.Log)
> [2018-01-29 15:31:41,945] INFO Cleared earliest 0 entries from epoch cache based on passed offset 6582 leaving 1 in EpochFile for partition streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0 (kafka.server.epoch.LeaderEpochFileCache)
> [2018-01-29 15:31:42,923] INFO Rolled new log segment for 'streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0' in 2 ms. (kafka.log.Log)
> [2018-01-29 15:31:42,924] INFO Scheduling log segment 6582 for log streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0 for deletion. (kafka.log.Log)
> 
> 100 seconds later (consistent with the setting for file.delete.delay.ms), the files are actually deleted:
> 
> [2018-01-29 15:33:21,923] INFO Deleting segment 0 from log streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0. (kafka.log.Log)
> [2018-01-29 15:33:21,929] INFO Deleting index /kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
> [2018-01-29 15:33:21,929] INFO Deleting index /kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000000000.timeindex.deleted (kafka.log.TimeIndex)
> [2018-01-29 15:33:22,925] INFO Deleting segment 6582 from log streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0. (kafka.log.Log)
> [2018-01-29 15:33:22,926] INFO Deleting index /kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000006582.index.deleted (kafka.log.OffsetIndex)
> [2018-01-29 15:33:22,927] INFO Deleting index /kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000006582.timeindex.deleted (kafka.log.TimeIndex)
> 
> Does anyone know what might be causing the messages in the repartitioning topic to be deleted so aggressively?
> 
> Thanks,
> Martin
> 
>