You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/12/05 23:04:57 UTC

[kafka] branch 2.0 updated: KAFKA-6388; Recover from rolling an empty segment that already exists (#5986)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new fc9532d  KAFKA-6388; Recover from rolling an empty segment that already exists (#5986)
fc9532d is described below

commit fc9532d44e3b892ae2e4089f1f987210440eb41e
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Wed Dec 5 14:49:19 2018 -0800

    KAFKA-6388; Recover from rolling an empty segment that already exists (#5986)
    
    There were several reported incidents where the log is rolled to a new segment with the same base offset as an active segment, causing KafkaException: Trying to roll a new log segment for topic partition X-N with start offset M while it already exists. In the cases we have seen, this happens to an empty log segment where there is long idle time before the next append and somehow we get to a state where offsetIndex.isFull() returns true due to _maxEntries == 0. This PR recovers from th [...]
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/log/Log.scala            | 52 +++++++++++++------
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |  2 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 59 ++++++++++++++++++++--
 3 files changed, 92 insertions(+), 21 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 4e335cc..57b4a2d 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1447,8 +1447,8 @@ class Log(@volatile var dir: File,
         in the header.
       */
       appendInfo.firstOffset match {
-        case Some(firstOffset) => roll(firstOffset)
-        case None => roll(maxOffsetInMessages - Integer.MAX_VALUE)
+        case Some(firstOffset) => roll(Some(firstOffset))
+        case None => roll(Some(maxOffsetInMessages - Integer.MAX_VALUE))
       }
     } else {
       segment
@@ -1461,22 +1461,45 @@ class Log(@volatile var dir: File,
    *
    * @return The newly rolled segment
    */
-  def roll(expectedNextOffset: Long = 0): LogSegment = {
+  def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
     maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
       val start = time.hiResClockMs()
       lock synchronized {
         checkIfMemoryMappedBufferClosed()
-        val newOffset = math.max(expectedNextOffset, logEndOffset)
+        val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset)
         val logFile = Log.logFile(dir, newOffset)
-        val offsetIdxFile = offsetIndexFile(dir, newOffset)
-        val timeIdxFile = timeIndexFile(dir, newOffset)
-        val txnIdxFile = transactionIndexFile(dir, newOffset)
-        for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
-          warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
-          Files.delete(file.toPath)
-        }
 
-        Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())
+        if (segments.containsKey(newOffset)) {
+          // segment with the same base offset already exists and loaded
+          if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
+            // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an
+            // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0).
+            warn(s"Trying to roll a new log segment with start offset $newOffset " +
+                 s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " +
+                 s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," +
+                 s" size of offset index: ${activeSegment.offsetIndex.entries}.")
+            deleteSegment(activeSegment)
+          } else {
+            throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" +
+                                     s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
+                                     s"segment is ${segments.get(newOffset)}.")
+          }
+        } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
+          throw new KafkaException(
+            s"Trying to roll a new log segment for topic partition $topicPartition with " +
+            s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment")
+        } else {
+          val offsetIdxFile = offsetIndexFile(dir, newOffset)
+          val timeIdxFile = timeIndexFile(dir, newOffset)
+          val txnIdxFile = transactionIndexFile(dir, newOffset)
+
+          for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
+            warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
+            Files.delete(file.toPath)
+          }
+
+          Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())
+        }
 
         // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
         // offset align with the new segment offset since this ensures we can recover the segment by beginning
@@ -1493,10 +1516,7 @@ class Log(@volatile var dir: File,
           fileAlreadyExists = false,
           initFileSize = initFileSize,
           preallocate = config.preallocate)
-        val prev = addSegment(segment)
-        if (prev != null)
-          throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with " +
-            s"start offset $newOffset while it already exists.")
+        addSegment(segment)
         // We need to update the segment base offset and append position data of the metadata when log rolls.
         // The next offset should not change.
         updateLogEndOffset(nextOffsetMetadata.messageOffset)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index ff5af61..51477b6 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1118,7 +1118,7 @@ class LogCleanerTest extends JUnitSuite {
     log.appendAsFollower(record1)
     val record2 = messageWithOffset("hello".getBytes, "hello".getBytes, 1)
     log.appendAsFollower(record2)
-    log.roll(Int.MaxValue/2) // starting a new log segment at offset Int.MaxValue/2
+    log.roll(Some(Int.MaxValue/2)) // starting a new log segment at offset Int.MaxValue/2
     val record3 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue/2)
     log.appendAsFollower(record3)
     val record4 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue.toLong + 1)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index e584b8c..b0f215e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -141,6 +141,52 @@ class LogTest {
     assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments)
   }
 
+  @Test
+  def testRollSegmentThatAlreadyExists() {
+    val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L)
+
+    // create a log
+    val log = createLog(logDir, logConfig)
+    assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
+
+    // roll active segment with the same base offset of size zero should recreate the segment
+    log.roll(Some(0L))
+    assertEquals("Expect 1 segment after roll() empty segment with base offset.", 1, log.numberOfSegments)
+
+    // should be able to append records to active segment
+    val records = TestUtils.records(
+      List(new SimpleRecord(mockTime.milliseconds, "k1".getBytes, "v1".getBytes)),
+      baseOffset = 0L, partitionLeaderEpoch = 0)
+    log.appendAsFollower(records)
+    assertEquals("Expect one segment.", 1, log.numberOfSegments)
+    assertEquals(0L, log.activeSegment.baseOffset)
+
+    // make sure we can append more records
+    val records2 = TestUtils.records(
+      List(new SimpleRecord(mockTime.milliseconds + 10, "k2".getBytes, "v2".getBytes)),
+      baseOffset = 1L, partitionLeaderEpoch = 0)
+    log.appendAsFollower(records2)
+
+    assertEquals("Expect two records in the log", 2, log.logEndOffset)
+    assertEquals(0, readLog(log, 0, 100, Some(1)).records.batches.iterator.next().lastOffset)
+    assertEquals(1, readLog(log, 1, 100, Some(2)).records.batches.iterator.next().lastOffset)
+
+    // roll so that active segment is empty
+    log.roll()
+    assertEquals("Expect base offset of active segment to be LEO", 2L, log.activeSegment.baseOffset)
+    assertEquals("Expect two segments.", 2, log.numberOfSegments)
+
+    // manually resize offset index to force roll of an empty active segment on next append
+    log.activeSegment.offsetIndex.resize(0)
+    val records3 = TestUtils.records(
+      List(new SimpleRecord(mockTime.milliseconds + 12, "k3".getBytes, "v3".getBytes)),
+      baseOffset = 2L, partitionLeaderEpoch = 0)
+    log.appendAsFollower(records3)
+    assertTrue(log.activeSegment.offsetIndex.maxEntries > 1)
+    assertEquals(2, readLog(log, 2, 100, Some(3)).records.batches.iterator.next().lastOffset)
+    assertEquals("Expect two segments.", 2, log.numberOfSegments)
+  }
+
   @Test(expected = classOf[OutOfOrderSequenceException])
   def testNonSequentialAppend(): Unit = {
     // create a log
@@ -827,17 +873,17 @@ class LogTest {
     val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0)
-    log.roll(1L)
+    log.roll(Some(1L))
     assertEquals(Some(1L), log.latestProducerSnapshotOffset)
     assertEquals(Some(1L), log.oldestProducerSnapshotOffset)
 
     log.appendAsLeader(TestUtils.singletonRecords("b".getBytes), leaderEpoch = 0)
-    log.roll(2L)
+    log.roll(Some(2L))
     assertEquals(Some(2L), log.latestProducerSnapshotOffset)
     assertEquals(Some(1L), log.oldestProducerSnapshotOffset)
 
     log.appendAsLeader(TestUtils.singletonRecords("c".getBytes), leaderEpoch = 0)
-    log.roll(3L)
+    log.roll(Some(3L))
     assertEquals(Some(3L), log.latestProducerSnapshotOffset)
 
     // roll triggers a flush at the starting offset of the new segment, we should retain all snapshots
@@ -1281,7 +1327,7 @@ class LogTest {
     val logConfig = LogTest.createLogConfig()
     val log = createLog(logDir,  logConfig)
     log.closeHandlers()
-    log.roll(1)
+    log.roll(Some(1L))
   }
 
   @Test
@@ -3512,6 +3558,11 @@ class LogTest {
       expectDeletedFiles)
   }
 
+  private def readLog(log: Log, startOffset: Long, maxLength: Int,
+                      maxOffset: Option[Long] = None,
+                      minOneMessage: Boolean = true): FetchDataInfo = {
+    log.read(startOffset, maxLength, maxOffset, minOneMessage, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+  }
 }
 
 object LogTest {