You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Niklas Lönn (JIRA)" <ji...@apache.org> on 2018/10/15 10:04:00 UTC

[jira] [Updated] (KAFKA-7506) KafkaStreams repartition topic settings not suitable for processing old records

     [ https://issues.apache.org/jira/browse/KAFKA-7506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Niklas Lönn updated KAFKA-7506:
-------------------------------
    Description: 
Hi, We are using Kafka Streams to process a compacted store, when resetting the application/processing from scratch the default topic configuration for repartition topics is 50MB and 10min segment sizes.

 

As the retention.ms is undefined, this leads to default retention.ms and log cleaner starts competing with the application, effectively causing the streams app to skip records.

{{Application logs the following:}}

{{Fetch offset 213792 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7, resetting offset}}
{{Fetch offset 110227 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2, resetting offset}}
{{Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7 to offset 233302.}}
{{Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2 to offset 119914.}}

By adding the following configuration to RepartitionTopicConfig.java the issue is solved

{{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // Infinite}}

 
 My understanding is that this should be safe as KafkaStreams uses the admin API to delete segments.
  

  was:
Hi, We are using Kafka Streams to process a compacted store, when resetting the application/processing from scratch the default topic configuration for repartition topics is 50MB and 10min segment sizes.

 

As the retention.ms is undefined, this leads to default retention.ms and log cleaner starts competing with the application, effectively causing the streams app to skip records.

{{Application logs the following:}}

{\{ Fetch offset 213792 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7, resetting offset}}
 \{{ Fetch offset 110227 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2, resetting offset}}
 \{{ Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7 to offset 233302.}}
 \{{ Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2 to offset 119914.}}

By adding the following configuration to RepartitionTopicConfig.java the issue is solved

{{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // Infinite}}

 
 My understanding is that this should be safe as KafkaStreams uses the admin API to delete segments.
  


> KafkaStreams repartition topic settings not suitable for processing old records
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-7506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7506
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Niklas Lönn
>            Priority: Major
>
> Hi, We are using Kafka Streams to process a compacted store, when resetting the application/processing from scratch the default topic configuration for repartition topics is 50MB and 10min segment sizes.
>  
> As the retention.ms is undefined, this leads to default retention.ms and log cleaner starts competing with the application, effectively causing the streams app to skip records.
> {{Application logs the following:}}
> {{Fetch offset 213792 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7, resetting offset}}
> {{Fetch offset 110227 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2, resetting offset}}
> {{Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7 to offset 233302.}}
> {{Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2 to offset 119914.}}
> By adding the following configuration to RepartitionTopicConfig.java the issue is solved
> {{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // Infinite}}
>  
>  My understanding is that this should be safe as KafkaStreams uses the admin API to delete segments.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)