You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/01/31 11:44:41 UTC

[kafka] branch 2.4 updated: KAFKA-8764: LogCleanerManager endless loop while compacting/cleaning (#7932)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new f78878e  KAFKA-8764: LogCleanerManager endless loop while compacting/cleaning (#7932)
f78878e is described below

commit f78878e6c1efbd98e9ebeff61979738871a54ffb
Author: Tomislav <to...@gmail.com>
AuthorDate: Tue Jan 14 23:03:44 2020 +0100

    KAFKA-8764: LogCleanerManager endless loop while compacting/cleaning (#7932)
    
    This fix makes the LogCleaner tolerant of gaps in the offset sequence. Previously, this could lead to endless loops of cleaning which required manual intervention.
    
    Reviewers: Jun Rao <ju...@gmail.com>, David Arthur <mu...@gmail.com>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     | 15 +++++++-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 43 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index bcb3586..f7babf7 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 import scala.collection.{Iterable, Seq, Set, mutable}
 import scala.util.control.ControlThrowable
 
@@ -878,6 +879,11 @@ private[log] class Cleaner(val id: Int,
                                   stats: CleanerStats): Unit = {
     map.clear()
     val dirty = log.logSegments(start, end).toBuffer
+    val nextSegmentStartOffsets = new ListBuffer[Long]
+    if (dirty.nonEmpty) {
+      for (nextSegment <- dirty.tail) nextSegmentStartOffsets.append(nextSegment.baseOffset)
+      nextSegmentStartOffsets.append(end)
+    }
     info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))
 
     val transactionMetadata = new CleanedTransactionMetadata
@@ -887,10 +893,10 @@ private[log] class Cleaner(val id: Int,
     // Add all the cleanable dirty segments. We must take at least map.slots * load_factor,
     // but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
     var full = false
-    for (segment <- dirty if !full) {
+    for ( (segment, nextSegmentStartOffset) <- dirty.zip(nextSegmentStartOffsets) if !full) {
       checkDone(log.topicPartition)
 
-      full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, log.config.maxMessageSize,
+      full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, nextSegmentStartOffset, log.config.maxMessageSize,
         transactionMetadata, stats)
       if (full)
         debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
@@ -911,6 +917,7 @@ private[log] class Cleaner(val id: Int,
                                        segment: LogSegment,
                                        map: OffsetMap,
                                        startOffset: Long,
+                                       nextSegmentStartOffset: Long,
                                        maxLogMessageSize: Int,
                                        transactionMetadata: CleanedTransactionMetadata,
                                        stats: CleanerStats): Boolean = {
@@ -964,6 +971,10 @@ private[log] class Cleaner(val id: Int,
       if(position == startPosition)
         growBuffersOrFail(segment.log, position, maxLogMessageSize, records)
     }
+
+    // In the case of offsets gap, fast forward to latest expected offset in this segment.
+    map.updateLatestOffset(nextSegmentStartOffset - 1L)
+
     restoreBuffers()
     false
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 18d86b6..644822c 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1561,6 +1561,49 @@ class LogCleanerTest {
     assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.batches.iterator.next().lastOffset)
   }
 
+  /**
+   * Verify that the clean is able to move beyond missing offsets records in dirty log
+   */
+  @Test
+  def testCleaningBeyondMissingOffsets(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024*1024: java.lang.Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    val logConfig = LogConfig(logProps)
+    val cleaner = makeCleaner(Int.MaxValue)
+
+    {
+      val log = makeLog(dir = TestUtils.randomPartitionLogDir(tmpdir), config = logConfig)
+      writeToLog(log, (0 to 9) zip (0 to 9), (0L to 9L))
+      // roll new segment with baseOffset 11, leaving previous with holes in offset range [9,10]
+      log.roll(Some(11L))
+
+      // active segment record
+      log.appendAsFollower(messageWithOffset(1015, 1015, 11L))
+
+      val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition, log, 0L, log.activeSegment.baseOffset, needCompactionNow = true))
+      assertEquals("Cleaning point should pass offset gap", log.activeSegment.baseOffset, nextDirtyOffset)
+    }
+
+
+    {
+      val log = makeLog(dir = TestUtils.randomPartitionLogDir(tmpdir), config = logConfig)
+      writeToLog(log, (0 to 9) zip (0 to 9), (0L to 9L))
+      // roll new segment with baseOffset 15, leaving previous with holes in offset rage [10, 14]
+      log.roll(Some(15L))
+
+      writeToLog(log, (15 to 24) zip (15 to 24), (15L to 24L))
+      // roll new segment with baseOffset 30, leaving previous with holes in offset range [25, 29]
+      log.roll(Some(30L))
+
+      // active segment record
+      log.appendAsFollower(messageWithOffset(1015, 1015, 30L))
+
+      val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition, log, 0L, log.activeSegment.baseOffset, needCompactionNow = true))
+      assertEquals("Cleaning point should pass offset gap in multiple segments", log.activeSegment.baseOffset, nextDirtyOffset)
+    }
+  }
+
   private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
     for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
       yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset