You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/21 19:43:36 UTC
kafka git commit: KAFKA-3587 [Backport]: LogCleaner fails due to
incorrect offset map computation
Repository: kafka
Updated Branches:
refs/heads/0.9.0 df562ad21 -> 3619ad393
KAFKA-3587 [Backport]: LogCleaner fails due to incorrect offset map computation
Author: Ivan Dyachkov <iv...@klarna.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1818 from id/backport-KAFKA-3587
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3619ad39
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3619ad39
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3619ad39
Branch: refs/heads/0.9.0
Commit: 3619ad3935962b8cbbe8077c70956ffed2cbffbb
Parents: df562ad
Author: Ivan Dyachkov <iv...@klarna.com>
Authored: Wed Dec 21 11:43:30 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Dec 21 11:43:30 2016 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogCleaner.scala | 27 ++++++++++-----
.../test/scala/unit/kafka/log/CleanerTest.scala | 35 ++++++++++++++++++--
2 files changed, 50 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3619ad39/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index d5c247c..1fd2e3f 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -575,17 +575,19 @@ private[log] class Cleaner(val id: Int,
// but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
var offset = dirty.head.baseOffset
require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name))
- val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
var full = false
for (segment <- dirty if !full) {
checkDone(log.topicAndPartition)
- val segmentSize = segment.nextOffset() - segment.baseOffset
- require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize, log.name, segment.log.file.getName, maxDesiredMapSize))
- if (map.size + segmentSize <= maxDesiredMapSize)
- offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
- else
+ val newOffset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
+ if (newOffset > -1L)
+ offset = newOffset
+ else {
+ // If not even one segment can fit in the map, compaction cannot happen
+ require(offset > start, "Unable to build the offset map for segment %s/%s. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(log.name, segment.log.file.getName))
+ debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
full = true
+ }
}
info("Offset map for log %s complete.".format(log.name))
offset
@@ -597,11 +599,12 @@ private[log] class Cleaner(val id: Int,
* @param segment The segment to index
* @param map The map in which to store the key=>offset mapping
*
- * @return The final offset covered by the map
+ * @return The final offset covered by the map or -1 if the map is full
*/
private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = {
var position = 0
var offset = segment.baseOffset
+ val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
while (position < segment.log.sizeInBytes) {
checkDone(topicAndPartition)
readBuffer.clear()
@@ -610,8 +613,14 @@ private[log] class Cleaner(val id: Int,
val startPosition = position
for (entry <- messages) {
val message = entry.message
- if (message.hasKey)
- map.put(message.key, entry.offset)
+ if (message.hasKey) {
+ if (map.size < maxDesiredMapSize)
+ map.put(message.key, entry.offset)
+ else {
+ // The map is full, stop looping and return
+ return -1L
+ }
+ }
offset = entry.offset
stats.indexMessagesRead(1)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3619ad39/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 8ab9f91..6d8a7ba 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -422,8 +422,33 @@ class CleanerTest extends JUnitSuite {
recoverAndCheck(config, cleanedKeys)
}
-
-
+
+ @Test
+ def testBuildOffsetMapFakeLarge() {
+ val map = new FakeOffsetMap(1000)
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
+ logProps.put(LogConfig.SegmentIndexBytesProp, 72: java.lang.Integer)
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ val logConfig = LogConfig(logProps)
+ val log = makeLog(config = logConfig)
+ val cleaner = makeCleaner(Int.MaxValue)
+ val start = 0
+ val end = 2
+ val offsetSeq = Seq(0L, 7206178L)
+ writeToLog(log, (start until end) zip (start until end), offsetSeq)
+ val endOffset = cleaner.buildOffsetMap(log, start, end, map)
+ assertEquals("Last offset should be the end offset.", 7206178L, endOffset)
+ assertEquals("Should have the expected number of messages in the map.", end - start, map.size)
+ assertEquals("Map should contain first value", 0L, map.get(key(0)))
+ assertEquals("Map should contain second value", 7206178L, map.get(key(1)))
+ }
+
+ private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
+ for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
+ yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset
+ }
+
def makeLog(dir: File = dir, config: LogConfig = logConfig) =
new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
@@ -449,6 +474,10 @@ class CleanerTest extends JUnitSuite {
def message(key: Int, value: Int) =
new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
+ def messageWithOffset(key: Int, value: Int, offset: Long) =
+ new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(offset),
+ new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
+
def unkeyedMessage(value: Int) =
new ByteBufferMessageSet(new Message(bytes=value.toString.getBytes))
@@ -478,4 +507,4 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
def size: Int = map.size
-}
\ No newline at end of file
+}