You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/12/14 17:46:05 UTC

kafka git commit: KAFKA-4529; Fix the issue that tombstone can be deleted too early; patched by Jiangjie Qin ; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/0.10.1 1ba2ee80e -> 395e85412


KAFKA-4529; Fix the issue that tombstone can be deleted too early; patched by Jiangjie Qin <ji...@linkedin.com>; reviewed by Jun Rao <ju...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/395e8541
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/395e8541
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/395e8541

Branch: refs/heads/0.10.1
Commit: 395e85412fc3881c9f28c8c5382459236a1d4670
Parents: 1ba2ee8
Author: Jiangjie Qin <ji...@linkedin.com>
Authored: Wed Dec 14 09:16:11 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Dec 14 09:18:18 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  |  6 ++--
 .../test/scala/unit/kafka/log/CleanerTest.scala | 29 ++++++++++++++++++++
 2 files changed, 32 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/395e8541/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index e0b0bb8..48bf931 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -351,12 +351,12 @@ private[log] class Cleaner(val id: Int,
     val deleteHorizonMs = 
       log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
         case None => 0L
-        case Some(seg) => seg.largestTimestamp - log.config.deleteRetentionMs
+        case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
     }
 
     // determine the timestamp up to which the log will be cleaned
     // this is the lower of the last active segment and the compaction lag
-    val cleanableHorizionMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.largestTimestamp).getOrElse(0L)
+    val cleanableHorizionMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
 
     // group the segments and clean the groups
     info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizionMs), new Date(deleteHorizonMs)))
@@ -398,7 +398,7 @@ private[log] class Cleaner(val id: Int,
     try {
       // clean segments into the new destination segment
       for (old <- segments) {
-        val retainDeletes = old.largestTimestamp > deleteHorizonMs
+        val retainDeletes = old.lastModified > deleteHorizonMs
         info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
             .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
         cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize)

http://git-wip-us.apache.org/repos/asf/kafka/blob/395e8541/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 536f10d..221cb0e 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -673,6 +673,35 @@ class CleanerTest extends JUnitSuite {
       assertEquals(offset, value)
     }
   }
+  
+  @Test
+  def testCleanTombstone(): Unit = {
+    val logConfig = LogConfig(new Properties())
+
+    val log = makeLog(config = logConfig)
+    val cleaner = makeCleaner(10)
+    
+    // Append a message with a large timestamp.
+    log.append(TestUtils.singleMessageSet(payload = "0".getBytes,
+                                          key = "0".getBytes,
+                                          timestamp = time.milliseconds() + logConfig.deleteRetentionMs + 10000))
+    log.roll()
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
+    // Append a tombstone with a small timestamp and roll out a new log segment.
+    log.append(TestUtils.singleMessageSet(payload = null, 
+                                          key = "0".getBytes, 
+                                          timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000))
+    log.roll()
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 1, log.activeSegment.baseOffset))
+    assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.head.offset)
+    // Append a message and roll out another log segment.
+    log.append(TestUtils.singleMessageSet(payload = "1".getBytes,
+                                          key = "1".getBytes,
+                                          timestamp = time.milliseconds()))
+    log.roll()
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset))
+    assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.head.offset)
+  }
 
   private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
     for(((key, value), offset) <- keysAndValues.zip(offsetSeq))