You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/06/21 01:00:45 UTC

kafka git commit: KAFKA-5413; Log cleaner fails due to large offset in segment file

Repository: kafka
Updated Branches:
  refs/heads/trunk de982ba3f -> 76f6e14b0


KAFKA-5413; Log cleaner fails due to large offset in segment file

the contribution is my original work and I license the work to the project under the project's open source license.

junrao , I had already made the code change before your last comment.  I've done pretty much what you said, except that I've not used the current segment because I wasn't sure if it will always be available.
I'm happy to change it if you prefer.
I've run all the unit and integration tests which all passed.

Author: Kelvin Rutt <ru...@gmail.com>
Author: Kelvin Rutt <ke...@sky.com>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #3357 from kelvinrutt/kafka_5413_bugfix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76f6e14b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76f6e14b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76f6e14b

Branch: refs/heads/trunk
Commit: 76f6e14b07bd97d17f9275968a047d68b4658704
Parents: de982ba
Author: Kelvin Rutt <ru...@gmail.com>
Authored: Tue Jun 20 18:00:41 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jun 20 18:00:41 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  | 29 +++++++++-
 .../scala/unit/kafka/log/LogCleanerTest.scala   | 58 +++++++++++++++++---
 2 files changed, 76 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/76f6e14b/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 623586f..5222487 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -362,9 +362,10 @@ private[log] class Cleaner(val id: Int,
     // this is the lower of the last active segment and the compaction lag
     val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
 
+
     // group the segments and clean the groups
     info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
-    for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
+    for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset))
       cleanSegments(log, group, offsetMap, deleteHorizonMs, stats)
 
     // record buffer utilization
@@ -616,7 +617,7 @@ private[log] class Cleaner(val id: Int,
    *
    * @return A list of grouped segments
    */
-  private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = {
+  private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int, firstUncleanableOffset: Long): List[Seq[LogSegment]] = {
     var grouped = List[List[LogSegment]]()
     var segs = segments.toList
     while(segs.nonEmpty) {
@@ -629,7 +630,7 @@ private[log] class Cleaner(val id: Int,
             logSize + segs.head.size <= maxSize &&
             indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
             timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
-            segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
+            lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) {
         group = segs.head :: group
         logSize += segs.head.size
         indexSize += segs.head.index.sizeInBytes
@@ -642,6 +643,28 @@ private[log] class Cleaner(val id: Int,
   }
 
   /**
+    * We want to get the last offset in the first log segment in segs.
+    * LogSegment.nextOffset() gives the exact last offset in a segment, but can be expensive since it requires
+    * scanning the segment from the last index entry.
+    * Therefore, we estimate the last offset of the first log segment by using
+    * the base offset of the next segment in the list.
+    * If the next segment doesn't exist, first Uncleanable Offset will be used.
+    *
+    * @param segs - remaining segments to group.
+    * @return The estimated last offset for the first segment in segs
+    */
+  private def lastOffsetForFirstSegment(segs: List[LogSegment], firstUncleanableOffset: Long): Long = {
+    if (segs.size > 1) {
+      /* if there is a next segment, use its base offset as the bounding offset to guarantee we know
+       * the worst case offset */
+      segs(1).baseOffset - 1
+    } else {
+      //for the last segment in the list, use the first uncleanable offset.
+      firstUncleanableOffset - 1
+    }
+  }
+
+  /**
    * Build a map of key_hash => offset for the keys in the cleanable dirty portion of the log to use in cleaning.
    * @param log The log to use
    * @param start The offset at which dirty messages begin

http://git-wip-us.apache.org/repos/asf/kafka/blob/76f6e14b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 5e95dc2..dabd2d6 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -642,17 +642,17 @@ class LogCleanerTest extends JUnitSuite {
     }
 
     // grouping by very large values should result in a single group with all the segments in it
-    var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
+    var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
     assertEquals(1, groups.size)
     assertEquals(log.numberOfSegments, groups.head.size)
     checkSegmentOrder(groups)
 
     // grouping by very small values should result in all groups having one entry
-    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue)
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue, log.logEndOffset)
     assertEquals(log.numberOfSegments, groups.size)
     assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
     checkSegmentOrder(groups)
-    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1)
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1, log.logEndOffset)
     assertEquals(log.numberOfSegments, groups.size)
     assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
     checkSegmentOrder(groups)
@@ -661,13 +661,13 @@ class LogCleanerTest extends JUnitSuite {
 
     // check grouping by log size
     val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1
-    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue)
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue, log.logEndOffset)
     checkSegmentOrder(groups)
     assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
 
     // check grouping by index size
     val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1
-    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize)
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize, log.logEndOffset)
     checkSegmentOrder(groups)
     assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
   }
@@ -699,14 +699,14 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
 
     // grouping should result in a single group with maximum relative offset of Int.MaxValue
-    var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
+    var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
     assertEquals(1, groups.size)
 
     // append another message, making last offset of second segment > Int.MaxValue
     log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
 
     // grouping should not group the two segments to ensure that maximum relative offset in each group <= Int.MaxValue
-    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
     assertEquals(2, groups.size)
     checkSegmentOrder(groups)
 
@@ -714,13 +714,55 @@ class LogCleanerTest extends JUnitSuite {
     while (log.numberOfSegments < 4)
       log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
 
-    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
     assertEquals(log.numberOfSegments - 1, groups.size)
     for (group <- groups)
       assertTrue("Relative offset greater than Int.MaxValue", group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue)
     checkSegmentOrder(groups)
   }
 
+  /** 
+   * Following the loading of a log segment where the index file is zero sized,
+   * the index returned would be the base offset.  Sometimes the log file would
+   * contain data with offsets in excess of the baseOffset which would cause
+   * the log cleaner to group together segments with a range of > Int.MaxValue
+   * this test replicates that scenario to ensure that the segments are grouped
+   * correctly.
+   */
+  @Test
+  def testSegmentGroupingFollowingLoadOfZeroIndex(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue)
+
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 400: java.lang.Integer)
+
+    //mimic the effect of loading an empty index file
+    logProps.put(LogConfig.IndexIntervalBytesProp, 400: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val record1 = messageWithOffset("hello".getBytes, "hello".getBytes, 0)
+    log.appendAsFollower(record1)
+    val record2 = messageWithOffset("hello".getBytes, "hello".getBytes, 1)
+    log.appendAsFollower(record2)
+    log.roll(Int.MaxValue/2) // starting a new log segment at offset Int.MaxValue/2
+    val record3 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue/2)
+    log.appendAsFollower(record3)
+    val record4 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue.toLong + 1)
+    log.appendAsFollower(record4)
+
+    assertTrue("Actual offset range should be > Int.MaxValue", log.logEndOffset - 1 - log.logStartOffset > Int.MaxValue)
+    assertTrue("index.lastOffset is reporting the wrong last offset", log.logSegments.last.index.lastOffset - log.logStartOffset <= Int.MaxValue)
+
+    // grouping should result in two groups because the second segment takes the offset range > MaxInt
+    val groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
+    assertEquals(2, groups.size)
+
+    for (group <- groups)
+      assertTrue("Relative offset greater than Int.MaxValue", group.last.nextOffset() - 1 - group.head.baseOffset <= Int.MaxValue)
+    checkSegmentOrder(groups)
+  }
+
   private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]): Unit = {
     val offsets = groups.flatMap(_.map(_.baseOffset))
     assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets)