You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/01/31 12:01:10 UTC

[kafka] branch 2.2 updated: KAFKA-8764: LogCleanerManager endless loop while compacting/cleaning (#7932)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new aa6d325  KAFKA-8764: LogCleanerManager endless loop while compacting/cleaning (#7932)
aa6d325 is described below

commit aa6d325e40509504fad84b69047371c9b9cccf9f
Author: Tomislav <to...@gmail.com>
AuthorDate: Tue Jan 14 23:03:44 2020 +0100

    KAFKA-8764: LogCleanerManager endless loop while compacting/cleaning (#7932)
    
    This fix makes the LogCleaner tolerant of gaps in the offset sequence. Previously, this could lead to endless loops of cleaning which required manual intervention.
    
    Reviewers: Jun Rao <ju...@gmail.com>, David Arthur <mu...@gmail.com>
---
 .../kafka/connect/transforms/ValueToKeyTest.java   |  1 -
 core/src/main/scala/kafka/log/LogCleaner.scala     | 15 +++++++-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 43 ++++++++++++++++++++++
 3 files changed, 56 insertions(+), 3 deletions(-)

diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
index 1894b46..ca3e7af 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Test;
 
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 7bc03c4..8a41368 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
 import scala.collection.{Iterable, Set, mutable}
+import scala.collection.mutable.ListBuffer
 import scala.util.control.ControlThrowable
 
 /**
@@ -863,6 +864,11 @@ private[log] class Cleaner(val id: Int,
                                   stats: CleanerStats) {
     map.clear()
     val dirty = log.logSegments(start, end).toBuffer
+    val nextSegmentStartOffsets = new ListBuffer[Long]
+    if (dirty.nonEmpty) {
+      for (nextSegment <- dirty.tail) nextSegmentStartOffsets.append(nextSegment.baseOffset)
+      nextSegmentStartOffsets.append(end)
+    }
     info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))
 
     val transactionMetadata = new CleanedTransactionMetadata
@@ -872,10 +878,10 @@ private[log] class Cleaner(val id: Int,
     // Add all the cleanable dirty segments. We must take at least map.slots * load_factor,
     // but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
     var full = false
-    for (segment <- dirty if !full) {
+    for ( (segment, nextSegmentStartOffset) <- dirty.zip(nextSegmentStartOffsets) if !full) {
       checkDone(log.topicPartition)
 
-      full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, log.config.maxMessageSize,
+      full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, nextSegmentStartOffset, log.config.maxMessageSize,
         transactionMetadata, stats)
       if (full)
         debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
@@ -896,6 +902,7 @@ private[log] class Cleaner(val id: Int,
                                        segment: LogSegment,
                                        map: OffsetMap,
                                        startOffset: Long,
+                                       nextSegmentStartOffset: Long,
                                        maxLogMessageSize: Int,
                                        transactionMetadata: CleanedTransactionMetadata,
                                        stats: CleanerStats): Boolean = {
@@ -949,6 +956,10 @@ private[log] class Cleaner(val id: Int,
       if(position == startPosition)
         growBuffersOrFail(segment.log, position, maxLogMessageSize, records)
     }
+
+    // In the case of offsets gap, fast forward to latest expected offset in this segment.
+    map.updateLatestOffset(nextSegmentStartOffset - 1L)
+
     restoreBuffers()
     false
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 17b5a24..7c42d7b 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1554,6 +1554,49 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.batches.iterator.next().lastOffset)
   }
 
+  /**
+   * Verify that the clean is able to move beyond missing offsets records in dirty log
+   */
+  @Test
+  def testCleaningBeyondMissingOffsets(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024*1024: java.lang.Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    val logConfig = LogConfig(logProps)
+    val cleaner = makeCleaner(Int.MaxValue)
+
+    {
+      val log = makeLog(dir = TestUtils.randomPartitionLogDir(tmpdir), config = logConfig)
+      writeToLog(log, (0 to 9) zip (0 to 9), (0L to 9L))
+      // roll new segment with baseOffset 11, leaving previous with holes in offset range [9,10]
+      log.roll(Some(11L))
+
+      // active segment record
+      log.appendAsFollower(messageWithOffset(1015, 1015, 11L))
+
+      val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition, log, 0L, log.activeSegment.baseOffset))
+      assertEquals("Cleaning point should pass offset gap", log.activeSegment.baseOffset, nextDirtyOffset)
+    }
+
+
+    {
+      val log = makeLog(dir = TestUtils.randomPartitionLogDir(tmpdir), config = logConfig)
+      writeToLog(log, (0 to 9) zip (0 to 9), (0L to 9L))
+      // roll new segment with baseOffset 15, leaving previous with holes in offset rage [10, 14]
+      log.roll(Some(15L))
+
+      writeToLog(log, (15 to 24) zip (15 to 24), (15L to 24L))
+      // roll new segment with baseOffset 30, leaving previous with holes in offset range [25, 29]
+      log.roll(Some(30L))
+
+      // active segment record
+      log.appendAsFollower(messageWithOffset(1015, 1015, 30L))
+
+      val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition, log, 0L, log.activeSegment.baseOffset))
+      assertEquals("Cleaning point should pass offset gap in multiple segments", log.activeSegment.baseOffset, nextDirtyOffset)
+    }
+  }
+
   private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
     for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
       yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset