You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/06/26 15:07:23 UTC
kafka git commit: KAFKA-5413; Port fix to 0.10.2 branch
Repository: kafka
Updated Branches:
refs/heads/0.10.2 8af61a3a9 -> 3f149ad46
KAFKA-5413; Port fix to 0.10.2 branch
Port KAFKA-5413 to the 0.10.2 branch
Author: Kelvin Rutt <ru...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #3397 from kelvinrutt/kafka_5413_port
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f149ad4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f149ad4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f149ad4
Branch: refs/heads/0.10.2
Commit: 3f149ad46aa667263ed9601173638b2dc0c6ba30
Parents: 8af61a3
Author: Kelvin Rutt <ru...@gmail.com>
Authored: Mon Jun 26 08:07:08 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jun 26 08:07:08 2017 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogCleaner.scala | 28 ++++++++-
.../scala/unit/kafka/log/LogCleanerTest.scala | 60 +++++++++++++++++---
2 files changed, 77 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f149ad4/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 6c1f13d..88c1eeb 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -359,7 +359,7 @@ private[log] class Cleaner(val id: Int,
// group the segments and clean the groups
info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
- for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
+ for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset))
cleanSegments(log, group, offsetMap, deleteHorizonMs, stats)
// record buffer utilization
@@ -551,7 +551,7 @@ private[log] class Cleaner(val id: Int,
*
* @return A list of grouped segments
*/
- private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = {
+ private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int, firstUncleanableOffset: Long): List[Seq[LogSegment]] = {
var grouped = List[List[LogSegment]]()
var segs = segments.toList
while(segs.nonEmpty) {
@@ -564,7 +564,7 @@ private[log] class Cleaner(val id: Int,
logSize + segs.head.size <= maxSize &&
indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
- segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
+ lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) {
group = segs.head :: group
logSize += segs.head.size
indexSize += segs.head.index.sizeInBytes
@@ -577,6 +577,28 @@ private[log] class Cleaner(val id: Int,
}
/**
+ * We want to get the last offset in the first log segment in segs.
+ * LogSegment.nextOffset() gives the exact last offset in a segment, but can be expensive since it requires
+ * scanning the segment from the last index entry.
+ * Therefore, we estimate the last offset of the first log segment by using
+ * the base offset of the next segment in the list.
+ * If the next segment doesn't exist, first Uncleanable Offset will be used.
+ *
+ * @param segs - remaining segments to group.
+ * @return The estimated last offset for the first segment in segs
+ */
+ private def lastOffsetForFirstSegment(segs: List[LogSegment], firstUncleanableOffset: Long): Long = {
+ if (segs.size > 1) {
+ /* if there is a next segment, use its base offset as the bounding offset to guarantee we know
+ * the worst case offset */
+ segs(1).baseOffset - 1
+ } else {
+ //for the last segment in the list, use the first uncleanable offset.
+ firstUncleanableOffset - 1
+ }
+ }
+
+ /**
* Build a map of key_hash => offset for the keys in the cleanable dirty portion of the log to use in cleaning.
* @param log The log to use
* @param start The offset at which dirty messages begin
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f149ad4/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 40691b9..82a64b2 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -381,17 +381,17 @@ class LogCleanerTest extends JUnitSuite {
}
// grouping by very large values should result in a single group with all the segments in it
- var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
+ var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset )
assertEquals(1, groups.size)
assertEquals(log.numberOfSegments, groups.head.size)
checkSegmentOrder(groups)
// grouping by very small values should result in all groups having one entry
- groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue)
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue, log.logEndOffset )
assertEquals(log.numberOfSegments, groups.size)
assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
checkSegmentOrder(groups)
- groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1)
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1, log.logEndOffset)
assertEquals(log.numberOfSegments, groups.size)
assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
checkSegmentOrder(groups)
@@ -400,13 +400,13 @@ class LogCleanerTest extends JUnitSuite {
// check grouping by log size
val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1
- groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue)
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue, log.logEndOffset)
checkSegmentOrder(groups)
assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
// check grouping by index size
val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1
- groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize)
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize, log.logEndOffset )
checkSegmentOrder(groups)
assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
}
@@ -438,14 +438,14 @@ class LogCleanerTest extends JUnitSuite {
assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
// grouping should result in a single group with maximum relative offset of Int.MaxValue
- var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
+ var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset )
assertEquals(1, groups.size)
// append another message, making last offset of second segment > Int.MaxValue
log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
// grouping should not group the two segments to ensure that maximum relative offset in each group <= Int.MaxValue
- groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset )
assertEquals(2, groups.size)
checkSegmentOrder(groups)
@@ -453,7 +453,7 @@ class LogCleanerTest extends JUnitSuite {
while (log.numberOfSegments < 4)
log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
- groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset )
assertEquals(log.numberOfSegments - 1, groups.size)
for (group <- groups)
assertTrue("Relative offset greater than Int.MaxValue", group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue)
@@ -466,6 +466,50 @@ class LogCleanerTest extends JUnitSuite {
}
/**
+ * Following the loading of a log segment where the index file is zero sized,
+ * the index returned would be the base offset. Sometimes the log file would
+ * contain data with offsets in excess of the baseOffset which would cause
+ * the log cleaner to group together segments with a range of > Int.MaxValue
+ * this test replicates that scenario to ensure that the segments are grouped
+ * correctly.
+ */
+ @Test
+ def testSegmentGroupingFollowingLoadOfZeroIndex(): Unit = {
+ val cleaner = makeCleaner(Int.MaxValue)
+
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 400: java.lang.Integer)
+
+ //mimic the effect of loading an empty index file
+ logProps.put(LogConfig.IndexIntervalBytesProp, 400: java.lang.Integer)
+
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+ val record1 = MemoryRecords.withLogEntries(LogEntry.create(0, Record.create("hello".getBytes, "hello".getBytes)))
+ log.append(record1, assignOffsets = false)
+ val record2 = MemoryRecords.withLogEntries(LogEntry.create(1, Record.create("hello".getBytes, "hello".getBytes)))
+ log.append(record2, assignOffsets = false)
+
+ log.roll(Int.MaxValue/2) // starting a new log segment at offset Int.MaxValue/2
+
+ val record3 = MemoryRecords.withLogEntries(LogEntry.create(Int.MaxValue/2, Record.create("hello".getBytes, "hello".getBytes)))
+ log.append(record3, assignOffsets = false)
+ val record4 = MemoryRecords.withLogEntries(LogEntry.create(Int.MaxValue.toLong + 1, Record.create("hello".getBytes, "hello".getBytes)))
+ log.append(record4, assignOffsets = false)
+
+ assertTrue("Actual offset range should be > Int.MaxValue", log.logEndOffset - 1 - log.logStartOffset > Int.MaxValue)
+ assertTrue("index.lastOffset is reporting the wrong last offset", log.logSegments.last.index.lastOffset - log.logStartOffset <= Int.MaxValue)
+
+ // grouping should result in two groups because the second segment takes the offset range > MaxInt
+ val groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
+ assertEquals(2, groups.size)
+
+ for (group <- groups)
+ assertTrue("Relative offset greater than Int.MaxValue", group.last.nextOffset() - 1 - group.head.baseOffset <= Int.MaxValue)
+ checkSegmentOrder(groups)
+ }
+
+ /**
* Test building an offset map off the log
*/
@Test