You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "RivenSun (Jira)" <ji...@apache.org> on 2022/04/22 13:53:00 UTC
[jira] [Resolved] (KAFKA-13849) All topics whose cleanup policy contains `compact` are not executing deleteLogStartOffsetBreachedSegments as expected
[ https://issues.apache.org/jira/browse/KAFKA-13849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
RivenSun resolved KAFKA-13849.
------------------------------
Resolution: Not A Bug
> All topics whose cleanup policy contains `compact` are not executing deleteLogStartOffsetBreachedSegments as expected
> ----------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-13849
> URL: https://issues.apache.org/jira/browse/KAFKA-13849
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 3.0.1
> Reporter: RivenSun
> Priority: Major
>
> This should be a *regression* bug.
> https://issues.apache.org/jira/browse/KAFKA-7400
> Currently the logic in LogManager.cleanupLogs() does not follow the above JIRA.
> Analyze the source code of LogManager.cleanupLogs():
> 1. When getting deletableLogs, the logs with `compact` enabled are filtered out in the two branches of if-else below.
> {code:java}
> // clean current logs.
> val deletableLogs = {
> if (cleaner != null) {
> // prevent cleaner from working on same partitions when changing cleanup policy
> cleaner.pauseCleaningForNonCompactedPartitions()
> } else {
> currentLogs.filter {
> case (_, log) => !log.config.compact
> }
> }
> }{code}
> 2. But in the subsequent UnifiedLog.deleteOldSegments method, from the comments and code logic of this method, the topic of enable `compact` also wants to execute the deleteLogStartOffsetBreachedSegments method.
> {code:java}
> /**
> * If topic deletion is enabled, delete any local log segments that have either expired due to time based retention
> * or because the log size is > retentionSize.
> *
> * Whether or not deletion is enabled, delete any local log segments that are before the log start offset
> */
> def deleteOldSegments(): Int = {
> if (config.delete) {
> deleteLogStartOffsetBreachedSegments() +
> deleteRetentionSizeBreachedSegments() +
> deleteRetentionMsBreachedSegments()
> } else {
> deleteLogStartOffsetBreachedSegments()
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)