You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/12/15 19:14:47 UTC

kafka git commit: KAFKA-4529; LogCleaner should not delete the tombstone too early.

Repository: kafka
Updated Branches:
  refs/heads/trunk ea724497a -> a9687bc0d


KAFKA-4529; LogCleaner should not delete the tombstone too early.

cc junrao

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #2260 from becketqin/KAFKA-4529-trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a9687bc0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a9687bc0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a9687bc0

Branch: refs/heads/trunk
Commit: a9687bc0d8ddab27c83f7e6f3e87d9e74e32c8df
Parents: ea72449
Author: Jiangjie Qin <be...@gmail.com>
Authored: Thu Dec 15 11:14:41 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Dec 15 11:14:41 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  |  8 +++---
 .../scala/unit/kafka/log/LogCleanerTest.scala   | 29 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a9687bc0/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c5a73d5..7ddb2c4 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -350,15 +350,15 @@ private[log] class Cleaner(val id: Int,
     val deleteHorizonMs = 
       log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
         case None => 0L
-        case Some(seg) => seg.largestTimestamp - log.config.deleteRetentionMs
+        case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
     }
 
     // determine the timestamp up to which the log will be cleaned
     // this is the lower of the last active segment and the compaction lag
-    val cleanableHorizionMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.largestTimestamp).getOrElse(0L)
+    val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
 
     // group the segments and clean the groups
-    info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizionMs), new Date(deleteHorizonMs)))
+    info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
     for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
       cleanSegments(log, group, offsetMap, deleteHorizonMs, stats)
 
@@ -399,7 +399,7 @@ private[log] class Cleaner(val id: Int,
     try {
       // clean segments into the new destination segment
       for (old <- segments) {
-        val retainDeletes = old.largestTimestamp > deleteHorizonMs
+        val retainDeletes = old.lastModified > deleteHorizonMs
         info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
             .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
         cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9687bc0/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index a99d4b9..ae8e401 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -708,6 +708,35 @@ class LogCleanerTest extends JUnitSuite {
     }
   }
 
+  @Test
+  def testCleanTombstone(): Unit = {
+    val logConfig = LogConfig(new Properties())
+
+    val log = makeLog(config = logConfig)
+    val cleaner = makeCleaner(10)
+
+    // Append a message with a large timestamp.
+    log.append(TestUtils.singletonRecords(value = "0".getBytes,
+                                          key = "0".getBytes,
+                                          timestamp = time.milliseconds() + logConfig.deleteRetentionMs + 10000))
+    log.roll()
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
+    // Append a tombstone with a small timestamp and roll out a new log segment.
+    log.append(TestUtils.singletonRecords(value = null,
+                                          key = "0".getBytes,
+                                          timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000))
+    log.roll()
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 1, log.activeSegment.baseOffset))
+    assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowIterator().next().offset())
+    // Append a message and roll out another log segment.
+    log.append(TestUtils.singletonRecords(value = "1".getBytes,
+                                          key = "1".getBytes,
+                                          timestamp = time.milliseconds()))
+    log.roll()
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset))
+    assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowIterator().next().offset())
+  }
+
   private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
     for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
       yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset