You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/22 20:19:00 UTC

[jira] [Commented] (KAFKA-3587) LogCleaner fails due to incorrect offset map computation on a replica

    [ https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301904#comment-16301904 ] 

ASF GitHub Bot commented on KAFKA-3587:
---------------------------------------

guozhangwang closed pull request #1818: Backport KAFKA-3587
URL: https://github.com/apache/kafka/pull/1818
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index d5c247cab95..1fd2e3f8931 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -575,17 +575,19 @@ private[log] class Cleaner(val id: Int,
     // but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
     var offset = dirty.head.baseOffset
     require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name))
-    val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
     var full = false
     for (segment <- dirty if !full) {
       checkDone(log.topicAndPartition)
-      val segmentSize = segment.nextOffset() - segment.baseOffset
 
-      require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize,  log.name, segment.log.file.getName, maxDesiredMapSize))
-      if (map.size + segmentSize <= maxDesiredMapSize)
-        offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
-      else
+      val newOffset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
+      if (newOffset > -1L)
+        offset = newOffset
+      else {
+        // If not even one segment can fit in the map, compaction cannot happen
+        require(offset > start, "Unable to build the offset map for segment %s/%s. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(log.name, segment.log.file.getName))
+        debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
         full = true
+      }
     }
     info("Offset map for log %s complete.".format(log.name))
     offset
@@ -597,11 +599,12 @@ private[log] class Cleaner(val id: Int,
    * @param segment The segment to index
    * @param map The map in which to store the key=>offset mapping
    *
-   * @return The final offset covered by the map
+   * @return The final offset covered by the map or -1 if the map is full
    */
   private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = {
     var position = 0
     var offset = segment.baseOffset
+    val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
     while (position < segment.log.sizeInBytes) {
       checkDone(topicAndPartition)
       readBuffer.clear()
@@ -610,8 +613,14 @@ private[log] class Cleaner(val id: Int,
       val startPosition = position
       for (entry <- messages) {
         val message = entry.message
-        if (message.hasKey)
-          map.put(message.key, entry.offset)
+        if (message.hasKey) {
+          if (map.size < maxDesiredMapSize)
+            map.put(message.key, entry.offset)
+          else {
+            // The map is full, stop looping and return
+            return -1L
+          }
+        }
         offset = entry.offset
         stats.indexMessagesRead(1)
       }
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 8ab9f91e82d..6d8a7bad227 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -422,8 +422,33 @@ class CleanerTest extends JUnitSuite {
     recoverAndCheck(config, cleanedKeys)
     
   }
-  
-  
+
+  @Test
+  def testBuildOffsetMapFakeLarge() {
+    val map = new FakeOffsetMap(1000)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 72: java.lang.Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    val logConfig = LogConfig(logProps)
+    val log = makeLog(config = logConfig)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val start = 0
+    val end = 2
+    val offsetSeq = Seq(0L, 7206178L)
+    writeToLog(log, (start until end) zip (start until end), offsetSeq)
+    val endOffset = cleaner.buildOffsetMap(log, start, end, map)
+    assertEquals("Last offset should be the end offset.", 7206178L, endOffset)
+    assertEquals("Should have the expected number of messages in the map.", end - start, map.size)
+    assertEquals("Map should contain first value", 0L, map.get(key(0)))
+    assertEquals("Map should contain second value", 7206178L, map.get(key(1)))
+  }
+
+  private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
+    for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
+      yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset
+  }
+
   def makeLog(dir: File = dir, config: LogConfig = logConfig) =
     new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
@@ -449,6 +474,10 @@ class CleanerTest extends JUnitSuite {
   def message(key: Int, value: Int) = 
     new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
 
+  def messageWithOffset(key: Int, value: Int, offset: Long) = 
+    new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(offset),
+      new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
+
   def unkeyedMessage(value: Int) =
     new ByteBufferMessageSet(new Message(bytes=value.toString.getBytes))
 
@@ -478,4 +507,4 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
   
   def size: Int = map.size
   
-}
\ No newline at end of file
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> LogCleaner fails due to incorrect offset map computation on a replica
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-3587
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3587
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.9.0.1
>         Environment: Linux
>            Reporter: Kiran Pillarisetty
>            Assignee: Edoardo Comar
>             Fix For: 0.9.0.2, 0.10.0.0
>
>         Attachments: 0001-POC-improving-deduping-segments.patch
>
>
> Log Cleaner fails to compact a segment even when the number of messages in it is less than the offset map.
> In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner computes segment size by subtracting segment's base offset from the latest offset ("segmentSize = segment.nextOffset() - segment.baseOffset").  This works fine until you create another replica. When you create a replica, it's segment could contain data which is already compacted on other brokers. Depending up on the type of data, offset difference could be too big, larger than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail on that segment.
> Scenario:
> - Kafka 0.9.0.1
> - Cluster has two brokers.
> - Server.properties:
> log.cleaner.enable=true
> log.cleaner.dedupe.buffer.size=10485760 #10MB
> log.roll.ms=300000
> delete.topic.enable=true
> log.cleanup.policy=compact
> Steps to reproduce:
> 1. Create a topic with replication-factor of 1.
> ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic test.log.compact.1M --partitions 1 --replication-factor 1 --config cleanup.policy=compact --config segment.ms=300000
> 2. Use kafka-console-producer.sh to produce a single message with the following key:
> LC1,{"test": "xyz"}
> 3. Use  kafka-console-producer.sh to produce a large number of messages with the following key:
> LC2,{"test": "abc"}
> 4. Let log cleaner run. Make sure log is compacted.  Verify with:
>  ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files 00000000000000000000.log  --print-data-log
> Dumping 00000000000000000000.log
> Starting offset: 0
> offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": "xyz"}
> offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 payload: {"test": "abc"}
> 5.  Increase Replication Factor to 2.  Followed these steps: http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 6. Notice that log cleaner fails to compact the newly created replica with the following error.
> [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 7206179 messages in segment test.log.compact.1M-0/00000000000000000000.log but offset map can fit only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads
>         at scala.Predef$.require(Predef.scala:219)
>         at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
>         at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
>         at scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
>         at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
>         at kafka.log.Cleaner.clean(LogCleaner.scala:322)
>         at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
>         at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-04-18 14:49:45,601] INFO [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
> 7. Examine the entries in the replica segment:
> ./kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log  --print-data-log
> There are only 218418 messages in that segment.
> However, Log Cleaner seems to think that there are 7206179 messages in that segment (as per the above error)
> Error stems from this line in LogCleaner.scala:
> """val segmentSize = segment.nextOffset() - segment.baseOffset"""
> In Replica's log segment file ( 00000000000000000000.log), ending offset is 7206178. Beginning offset is 0.  That makes Log Cleaner think that there are 7206179 messages in that segment although there are only 218418 messages in it.
> IMO,  to address this kind of scenario, LogCleaner.scala should check for the number of messages in the segment, instead of subtracting beginning offset from the ending offset.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)