You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "RivenSun (Jira)" <ji...@apache.org> on 2022/04/22 13:53:00 UTC

[jira] [Resolved] (KAFKA-13849) All topics whose cleanup policy contains `compact` are not executing deleteLogStartOffsetBreachedSegments as expected

     [ https://issues.apache.org/jira/browse/KAFKA-13849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

RivenSun resolved KAFKA-13849.
------------------------------
    Resolution: Not A Bug

> All topics whose cleanup policy  contains `compact` are not executing deleteLogStartOffsetBreachedSegments as expected
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13849
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13849
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 3.0.1
>            Reporter: RivenSun
>            Priority: Major
>
> This should be a *regression* bug.
> https://issues.apache.org/jira/browse/KAFKA-7400
> Currently the logic in LogManager.cleanupLogs() does not follow the above JIRA.
> Analyze the source code of LogManager.cleanupLogs(): 
> 1. When getting deletableLogs, the logs with `compact` enabled are filtered out in the two branches of if-else below.
> {code:java}
> // clean current logs.
> val deletableLogs = {
>   if (cleaner != null) {
>     // prevent cleaner from working on same partitions when changing cleanup policy
>     cleaner.pauseCleaningForNonCompactedPartitions()
>   } else {
>     currentLogs.filter {
>       case (_, log) => !log.config.compact
>     }
>   }
> }{code}
> 2. But in the subsequent UnifiedLog.deleteOldSegments method, from the comments and code logic of this method, the topic of enable `compact` also wants to execute the deleteLogStartOffsetBreachedSegments method.
> {code:java}
> /**
>  * If topic deletion is enabled, delete any local log segments that have either expired due to time based retention
>  * or because the log size is > retentionSize.
>  *
>  * Whether or not deletion is enabled, delete any local log segments that are before the log start offset
>  */
> def deleteOldSegments(): Int = {
>   if (config.delete) {
>     deleteLogStartOffsetBreachedSegments() +
>       deleteRetentionSizeBreachedSegments() +
>       deleteRetentionMsBreachedSegments()
>   } else {
>     deleteLogStartOffsetBreachedSegments()
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)