You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2020/04/29 18:11:03 UTC

[kafka] branch 2.4 updated: [KAFKA-9826] Handle an unaligned first dirty offset during log cleaning. (#8469) (#8543)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 9d447be  [KAFKA-9826] Handle an unaligned first dirty offset during log cleaning. (#8469) (#8543)
9d447be is described below

commit 9d447be1ef8399bb162944bd0ded02384162f41f
Author: Steve Rodrigues <42...@users.noreply.github.com>
AuthorDate: Wed Apr 29 11:10:12 2020 -0700

    [KAFKA-9826] Handle an unaligned first dirty offset during log cleaning. (#8469) (#8543)
    
    In KAFKA-9826, a log whose first dirty offset was past the start of the active segment and past the last cleaned point resulted in an endless cycle of picking the segment to clean and discarding it. Though this didn't interfere with cleaning other log segments, it kept the log cleaner thread continuously busy (potentially wasting CPU and impacting other running threads) and filled the logs with lots of extraneous messages.
    
    This was determined to be because the active segment was getting mistakenly picked for cleaning, and because the logSegments code handles (start == end) cases only for (start, end) on a segment boundary: the intent is to return a null list, but if they're not on a segment boundary, the routine returns that segment.
    
    This fix has two parts:
    
    It changes logSegments to handle start==end by returning an empty List always.
    
    It changes the definition of calculateCleanableBytes to not consider anything past the UncleanableOffset; previously, it would potentially shift the UncleanableOffset to match the firstDirtyOffset even if the firstDirtyOffset was past the firstUncleanableOffset. This has no real effect now in the context of the fix for (1) but it makes the code read more like the model that the code is attempting to follow.
    
    These changes require modifications to a few test cases that handled this particular test case; they were introduced in the context of KAFKA-8764. Those situations are now handled elsewhere in code, but the tests themselves allowed a DirtyOffset in the active segment, and expected an active segment to be selected for cleaning.
    
    Reviewer: Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            | 23 ++++++++------
 .../main/scala/kafka/log/LogCleanerManager.scala   |  2 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 10 ++++---
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 35 ++++++++++++++++++++++
 4 files changed, 56 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 91bb1eb..401cb2c 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -2140,17 +2140,22 @@ class Log(@volatile var dir: File,
 
   /**
    * Get all segments beginning with the segment that includes "from" and ending with the segment
-   * that includes up to "to-1" or the end of the log (if to > logEndOffset)
+   * that includes up to "to-1" or the end of the log (if to > logEndOffset).
    */
   def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
-    lock synchronized {
-      val view = Option(segments.floorKey(from)).map { floor =>
-        if (to < floor)
-          throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " +
-            s"from offset $from mapping to segment with base offset $floor, which is greater than limit offset $to")
-        segments.subMap(floor, to)
-      }.getOrElse(segments.headMap(to))
-      view.values.asScala
+    if (from == to) {
+      // Handle non-segment-aligned empty sets
+      List.empty[LogSegment]
+    } else if (to < from) {
+      throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " +
+        s"from offset $from which is greater than limit offset $to")
+    } else {
+      lock synchronized {
+        val view = Option(segments.floorKey(from)).map { floor =>
+          segments.subMap(floor, to)
+        }.getOrElse(segments.headMap(to))
+        view.values.asScala
+      }
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 19f7e4d..04a97fd 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -607,7 +607,7 @@ private[log] object LogCleanerManager extends Logging {
   def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = {
     val firstUncleanableSegment = log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)
     val firstUncleanableOffset = firstUncleanableSegment.baseOffset
-    val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
+    val cleanableBytes = log.logSegments(math.min(firstDirtyOffset, firstUncleanableOffset), firstUncleanableOffset).map(_.size.toLong).sum
 
     (firstUncleanableOffset, cleanableBytes)
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 5e2620c..ed5b1e8 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -237,8 +237,9 @@ class LogCleanerManagerTest extends Logging {
     val cleanerManager = createCleanerManagerMock(logs)
     cleanerCheckpoints.put(tp, 0L)
 
-    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
-    assertEquals(2L, filthiestLog.firstDirtyOffset)
+    // The active segment is uncleanable and hence not filthy from the POV of the CleanerManager.
+    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time)
+    assertEquals(None, filthiestLog)
   }
 
   @Test
@@ -262,8 +263,9 @@ class LogCleanerManagerTest extends Logging {
     val cleanerManager = createCleanerManagerMock(logs)
     cleanerCheckpoints.put(tp, 3L)
 
-    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
-    assertEquals(3L, filthiestLog.firstDirtyOffset)
+    // These segments are uncleanable and hence not filthy
+    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time)
+    assertEquals(None, filthiestLog)
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index d0eb5a3..19067ab 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -469,6 +469,41 @@ class LogTest {
     assertEquals(101L, log.logEndOffset)
   }
 
+  /**
+   * Test the values returned by the logSegments call
+   */
+  @Test
+  def testLogSegmentsCallCorrect(): Unit = {
+    // Create 3 segments and make sure we get the right values from various logSegments calls.
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
+    def getSegmentOffsets(log :Log, from: Long, to: Long) = log.logSegments(from, to).map { _.baseOffset }
+    val setSize = createRecords.sizeInBytes
+    val msgPerSeg = 10
+    val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
+    // create a log
+    val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize)
+    val log = createLog(logDir, logConfig)
+    assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+
+    // segments expire in size
+    for (_ <- 1 to (2 * msgPerSeg + 2))
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
+
+    // from == to should always be null
+    assertEquals(List.empty[LogSegment], getSegmentOffsets(log, 10, 10))
+    assertEquals(List.empty[LogSegment], getSegmentOffsets(log, 15, 15))
+
+    assertEquals(List[Long](0, 10, 20), getSegmentOffsets(log, 0, 21))
+
+    assertEquals(List[Long](0), getSegmentOffsets(log, 1, 5))
+    assertEquals(List[Long](10, 20), getSegmentOffsets(log, 13, 21))
+    assertEquals(List[Long](10), getSegmentOffsets(log, 13, 17))
+
+    // from < to is bad
+    assertThrows[IllegalArgumentException]({ log.logSegments(10, 0) })
+  }
+
   @Test
   def testInitializationOfProducerSnapshotsUpgradePath(): Unit = {
     // simulate the upgrade path by creating a new log with several segments, deleting the