You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/12/14 17:46:05 UTC
kafka git commit: KAFKA-4529;
Fix the issue that tombstone can be deleted too early;
patched by Jiangjie Qin ;
reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/0.10.1 1ba2ee80e -> 395e85412
KAFKA-4529; Fix the issue that tombstone can be deleted too early; patched by Jiangjie Qin <ji...@linkedin.com>; reviewed by Jun Rao <ju...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/395e8541
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/395e8541
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/395e8541
Branch: refs/heads/0.10.1
Commit: 395e85412fc3881c9f28c8c5382459236a1d4670
Parents: 1ba2ee8
Author: Jiangjie Qin <ji...@linkedin.com>
Authored: Wed Dec 14 09:16:11 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Dec 14 09:18:18 2016 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogCleaner.scala | 6 ++--
.../test/scala/unit/kafka/log/CleanerTest.scala | 29 ++++++++++++++++++++
2 files changed, 32 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/395e8541/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index e0b0bb8..48bf931 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -351,12 +351,12 @@ private[log] class Cleaner(val id: Int,
val deleteHorizonMs =
log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
case None => 0L
- case Some(seg) => seg.largestTimestamp - log.config.deleteRetentionMs
+ case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
}
// determine the timestamp up to which the log will be cleaned
// this is the lower of the last active segment and the compaction lag
- val cleanableHorizionMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.largestTimestamp).getOrElse(0L)
+ val cleanableHorizionMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
// group the segments and clean the groups
info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizionMs), new Date(deleteHorizonMs)))
@@ -398,7 +398,7 @@ private[log] class Cleaner(val id: Int,
try {
// clean segments into the new destination segment
for (old <- segments) {
- val retainDeletes = old.largestTimestamp > deleteHorizonMs
+ val retainDeletes = old.lastModified > deleteHorizonMs
info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
.format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize)
http://git-wip-us.apache.org/repos/asf/kafka/blob/395e8541/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 536f10d..221cb0e 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -673,6 +673,35 @@ class CleanerTest extends JUnitSuite {
assertEquals(offset, value)
}
}
+
+ @Test
+ def testCleanTombstone(): Unit = {
+ val logConfig = LogConfig(new Properties())
+
+ val log = makeLog(config = logConfig)
+ val cleaner = makeCleaner(10)
+
+ // Append a message with a large timestamp.
+ log.append(TestUtils.singleMessageSet(payload = "0".getBytes,
+ key = "0".getBytes,
+ timestamp = time.milliseconds() + logConfig.deleteRetentionMs + 10000))
+ log.roll()
+ cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
+ // Append a tombstone with a small timestamp and roll out a new log segment.
+ log.append(TestUtils.singleMessageSet(payload = null,
+ key = "0".getBytes,
+ timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000))
+ log.roll()
+ cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 1, log.activeSegment.baseOffset))
+ assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.head.offset)
+ // Append a message and roll out another log segment.
+ log.append(TestUtils.singleMessageSet(payload = "1".getBytes,
+ key = "1".getBytes,
+ timestamp = time.milliseconds()))
+ log.roll()
+ cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset))
+ assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.head.offset)
+ }
private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
for(((key, value), offset) <- keysAndValues.zip(offsetSeq))