You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/21 19:43:36 UTC

kafka git commit: KAFKA-3587 [Backport]: LogCleaner fails due to incorrect offset map computation

Repository: kafka
Updated Branches:
  refs/heads/0.9.0 df562ad21 -> 3619ad393


KAFKA-3587 [Backport]: LogCleaner fails due to incorrect offset map computation

Author: Ivan Dyachkov <iv...@klarna.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1818 from id/backport-KAFKA-3587


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3619ad39
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3619ad39
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3619ad39

Branch: refs/heads/0.9.0
Commit: 3619ad3935962b8cbbe8077c70956ffed2cbffbb
Parents: df562ad
Author: Ivan Dyachkov <iv...@klarna.com>
Authored: Wed Dec 21 11:43:30 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Dec 21 11:43:30 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  | 27 ++++++++++-----
 .../test/scala/unit/kafka/log/CleanerTest.scala | 35 ++++++++++++++++++--
 2 files changed, 50 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3619ad39/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index d5c247c..1fd2e3f 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -575,17 +575,19 @@ private[log] class Cleaner(val id: Int,
     // but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
     var offset = dirty.head.baseOffset
     require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name))
-    val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
     var full = false
     for (segment <- dirty if !full) {
       checkDone(log.topicAndPartition)
-      val segmentSize = segment.nextOffset() - segment.baseOffset
 
-      require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize,  log.name, segment.log.file.getName, maxDesiredMapSize))
-      if (map.size + segmentSize <= maxDesiredMapSize)
-        offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
-      else
+      val newOffset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
+      if (newOffset > -1L)
+        offset = newOffset
+      else {
+        // If not even one segment can fit in the map, compaction cannot happen
+        require(offset > start, "Unable to build the offset map for segment %s/%s. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(log.name, segment.log.file.getName))
+        debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
         full = true
+      }
     }
     info("Offset map for log %s complete.".format(log.name))
     offset
@@ -597,11 +599,12 @@ private[log] class Cleaner(val id: Int,
    * @param segment The segment to index
    * @param map The map in which to store the key=>offset mapping
    *
-   * @return The final offset covered by the map
+   * @return The final offset covered by the map or -1 if the map is full
    */
   private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = {
     var position = 0
     var offset = segment.baseOffset
+    val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
     while (position < segment.log.sizeInBytes) {
       checkDone(topicAndPartition)
       readBuffer.clear()
@@ -610,8 +613,14 @@ private[log] class Cleaner(val id: Int,
       val startPosition = position
       for (entry <- messages) {
         val message = entry.message
-        if (message.hasKey)
-          map.put(message.key, entry.offset)
+        if (message.hasKey) {
+          if (map.size < maxDesiredMapSize)
+            map.put(message.key, entry.offset)
+          else {
+            // The map is full, stop looping and return
+            return -1L
+          }
+        }
         offset = entry.offset
         stats.indexMessagesRead(1)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3619ad39/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 8ab9f91..6d8a7ba 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -422,8 +422,33 @@ class CleanerTest extends JUnitSuite {
     recoverAndCheck(config, cleanedKeys)
     
   }
-  
-  
+
+  @Test
+  def testBuildOffsetMapFakeLarge() {
+    val map = new FakeOffsetMap(1000)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 72: java.lang.Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    val logConfig = LogConfig(logProps)
+    val log = makeLog(config = logConfig)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val start = 0
+    val end = 2
+    val offsetSeq = Seq(0L, 7206178L)
+    writeToLog(log, (start until end) zip (start until end), offsetSeq)
+    val endOffset = cleaner.buildOffsetMap(log, start, end, map)
+    assertEquals("Last offset should be the end offset.", 7206178L, endOffset)
+    assertEquals("Should have the expected number of messages in the map.", end - start, map.size)
+    assertEquals("Map should contain first value", 0L, map.get(key(0)))
+    assertEquals("Map should contain second value", 7206178L, map.get(key(1)))
+  }
+
+  private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
+    for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
+      yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset
+  }
+
   def makeLog(dir: File = dir, config: LogConfig = logConfig) =
     new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
@@ -449,6 +474,10 @@ class CleanerTest extends JUnitSuite {
   def message(key: Int, value: Int) = 
     new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
 
+  def messageWithOffset(key: Int, value: Int, offset: Long) = 
+    new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(offset),
+      new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
+
   def unkeyedMessage(value: Int) =
     new ByteBufferMessageSet(new Message(bytes=value.toString.getBytes))
 
@@ -478,4 +507,4 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
   
   def size: Int = map.size
   
-}
\ No newline at end of file
+}