You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/03/17 23:21:30 UTC

kafka git commit: KAFKA-3330; Truncate log cleaner offset checkpoint if the log is truncated

Repository: kafka
Updated Branches:
  refs/heads/trunk 61281f5c5 -> 579d473ce


KAFKA-3330; Truncate log cleaner offset checkpoint if the log is truncated

becketqin Can you take a look?

Author: Dong Lin <li...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #1009 from lindong28/KAFKA-3330


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/579d473c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/579d473c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/579d473c

Branch: refs/heads/trunk
Commit: 579d473ce9c5ef1a442af734e362dd545e5ab988
Parents: 61281f5
Author: Dong Lin <li...@gmail.com>
Authored: Thu Mar 17 15:21:20 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Mar 17 15:21:20 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala        |  7 +++++++
 core/src/main/scala/kafka/log/LogCleanerManager.scala | 12 ++++++++++++
 core/src/main/scala/kafka/log/LogManager.scala        |  8 ++++++--
 3 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/579d473c/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index a2e1913..e23234b 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -134,6 +134,13 @@ class LogCleaner(val config: CleanerConfig,
   }
 
   /**
+   * Truncate cleaner offset checkpoint for the given partition if its checkpointed offset is larger than the given offset
+   */
+  def maybeTruncateCheckpoint(dataDir: File, topicAndPartition: TopicAndPartition, offset: Long) {
+    cleanerManager.maybeTruncateCheckpoint(dataDir, topicAndPartition, offset)
+  }
+
+  /**
    *  Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
    *  This call blocks until the cleaning of the partition is aborted and paused.
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/579d473c/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index f6795d3..f92db4e 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -210,6 +210,18 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
     }
   }
 
+  def maybeTruncateCheckpoint(dataDir: File, topicAndPartition: TopicAndPartition, offset: Long) {
+    inLock(lock) {
+      if (logs.get(topicAndPartition).config.compact) {
+        val checkpoint = checkpoints(dataDir)
+        val existing = checkpoint.read()
+
+        if (existing.getOrElse(topicAndPartition, 0L) > offset)
+          checkpoint.write(existing + (topicAndPartition -> offset))
+      }
+    }
+  }
+
   /**
    * Save out the endOffset and remove the given log from the in-progress set, if not aborted.
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/579d473c/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index b64fac6..749c622 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -286,8 +286,10 @@ class LogManager(val logDirs: Array[File],
         if (needToStopCleaner && cleaner != null)
           cleaner.abortAndPauseCleaning(topicAndPartition)
         log.truncateTo(truncateOffset)
-        if (needToStopCleaner && cleaner != null)
+        if (needToStopCleaner && cleaner != null) {
+          cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicAndPartition, log.activeSegment.baseOffset)
           cleaner.resumeCleaning(topicAndPartition)
+        }
       }
     }
     checkpointRecoveryPointOffsets()
@@ -305,8 +307,10 @@ class LogManager(val logDirs: Array[File],
       if (cleaner != null)
         cleaner.abortAndPauseCleaning(topicAndPartition)
       log.truncateFullyAndStartAt(newOffset)
-      if (cleaner != null)
+      if (cleaner != null) {
+        cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicAndPartition, log.activeSegment.baseOffset)
         cleaner.resumeCleaning(topicAndPartition)
+      }
     }
     checkpointRecoveryPointOffsets()
   }