You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/12/15 19:14:47 UTC
kafka git commit: KAFKA-4529;
LogCleaner should not delete the tombstone too early.
Repository: kafka
Updated Branches:
refs/heads/trunk ea724497a -> a9687bc0d
KAFKA-4529; LogCleaner should not delete the tombstone too early.
cc junrao
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #2260 from becketqin/KAFKA-4529-trunk
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a9687bc0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a9687bc0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a9687bc0
Branch: refs/heads/trunk
Commit: a9687bc0d8ddab27c83f7e6f3e87d9e74e32c8df
Parents: ea72449
Author: Jiangjie Qin <be...@gmail.com>
Authored: Thu Dec 15 11:14:41 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Dec 15 11:14:41 2016 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogCleaner.scala | 8 +++---
.../scala/unit/kafka/log/LogCleanerTest.scala | 29 ++++++++++++++++++++
2 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9687bc0/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c5a73d5..7ddb2c4 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -350,15 +350,15 @@ private[log] class Cleaner(val id: Int,
val deleteHorizonMs =
log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
case None => 0L
- case Some(seg) => seg.largestTimestamp - log.config.deleteRetentionMs
+ case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
}
// determine the timestamp up to which the log will be cleaned
// this is the lower of the last active segment and the compaction lag
- val cleanableHorizionMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.largestTimestamp).getOrElse(0L)
+ val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
// group the segments and clean the groups
- info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizionMs), new Date(deleteHorizonMs)))
+ info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
cleanSegments(log, group, offsetMap, deleteHorizonMs, stats)
@@ -399,7 +399,7 @@ private[log] class Cleaner(val id: Int,
try {
// clean segments into the new destination segment
for (old <- segments) {
- val retainDeletes = old.largestTimestamp > deleteHorizonMs
+ val retainDeletes = old.lastModified > deleteHorizonMs
info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
.format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a9687bc0/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index a99d4b9..ae8e401 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -708,6 +708,35 @@ class LogCleanerTest extends JUnitSuite {
}
}
+ @Test
+ def testCleanTombstone(): Unit = {
+ val logConfig = LogConfig(new Properties())
+
+ val log = makeLog(config = logConfig)
+ val cleaner = makeCleaner(10)
+
+ // Append a message with a large timestamp.
+ log.append(TestUtils.singletonRecords(value = "0".getBytes,
+ key = "0".getBytes,
+ timestamp = time.milliseconds() + logConfig.deleteRetentionMs + 10000))
+ log.roll()
+ cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
+ // Append a tombstone with a small timestamp and roll out a new log segment.
+ log.append(TestUtils.singletonRecords(value = null,
+ key = "0".getBytes,
+ timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000))
+ log.roll()
+ cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 1, log.activeSegment.baseOffset))
+ assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowIterator().next().offset())
+ // Append a message and roll out another log segment.
+ log.append(TestUtils.singletonRecords(value = "1".getBytes,
+ key = "1".getBytes,
+ timestamp = time.milliseconds()))
+ log.roll()
+ cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset))
+ assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowIterator().next().offset())
+ }
+
private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset