You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/12/06 18:18:00 UTC

[kafka] branch 1.1 updated: KAFKA-6388; Recover from rolling an empty segment that already exists (#6006)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 0074414  KAFKA-6388; Recover from rolling an empty segment that already exists (#6006)
0074414 is described below

commit 0074414a42036370bae892ab784311a4c310ff5e
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Thu Dec 6 10:17:49 2018 -0800

    KAFKA-6388; Recover from rolling an empty segment that already exists (#6006)
    
    There were several reported incidents where the log is rolled to a new segment with the same base offset as an active segment, causing KafkaException: Trying to roll a new log segment for topic partition X-N with start offset M while it already exists. In the cases we have seen, this happens to an empty log segment where there is long idle time before the next append and somehow we get to a state where offsetIndex.isFull() returns true due to _maxEntries == 0. This PR recovers from th [...]
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/log/Log.scala            | 48 +++++++++++++------
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |  2 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 54 ++++++++++++++++++++--
 3 files changed, 85 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index eeb569a..31acaea 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1318,7 +1318,7 @@ class Log(@volatile var dir: File,
         base offset was too low to contain the next message.  This edge case is possible when a replica is recovering a
         highly compacted topic from scratch.
        */
-      roll(maxOffsetInMessages - Integer.MAX_VALUE)
+      roll(Some(maxOffsetInMessages - Integer.MAX_VALUE))
     } else {
       segment
     }
@@ -1330,22 +1330,44 @@ class Log(@volatile var dir: File,
    *
    * @return The newly rolled segment
    */
-  def roll(expectedNextOffset: Long = 0): LogSegment = {
+  def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
     maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
       val start = time.hiResClockMs()
       lock synchronized {
         checkIfMemoryMappedBufferClosed()
-        val newOffset = math.max(expectedNextOffset, logEndOffset)
+        val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset)
         val logFile = Log.logFile(dir, newOffset)
-        val offsetIdxFile = offsetIndexFile(dir, newOffset)
-        val timeIdxFile = timeIndexFile(dir, newOffset)
-        val txnIdxFile = transactionIndexFile(dir, newOffset)
-        for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
-          warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
-          Files.delete(file.toPath)
-        }
 
-        Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())
+        if (segments.containsKey(newOffset)) {
+          // segment with the same base offset already exists and loaded
+          if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
+            // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an
+            // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0).
+            warn(s"Trying to roll a new log segment with start offset $newOffset " +
+                 s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " +
+                 s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," +
+                 s" size of offset index: ${activeSegment.offsetIndex.entries}.")
+            deleteSegment(activeSegment)
+          } else {
+            throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" +
+                                     s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
+                                     s"segment is ${segments.get(newOffset)}.")
+          }
+        } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
+          throw new KafkaException(
+            s"Trying to roll a new log segment for topic partition $topicPartition with " +
+            s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment")
+        } else {
+          val offsetIdxFile = offsetIndexFile(dir, newOffset)
+          val timeIdxFile = timeIndexFile(dir, newOffset)
+          val txnIdxFile = transactionIndexFile(dir, newOffset)
+          for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
+            warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
+            Files.delete(file.toPath)
+          }
+
+          Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())
+        }
 
         // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
         // offset align with the new segment offset since this ensures we can recover the segment by beginning
@@ -1362,9 +1384,7 @@ class Log(@volatile var dir: File,
           fileAlreadyExists = false,
           initFileSize = initFileSize,
           preallocate = config.preallocate)
-        val prev = addSegment(segment)
-        if (prev != null)
-          throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
+        addSegment(segment)
         // We need to update the segment base offset and append position data of the metadata when log rolls.
         // The next offset should not change.
         updateLogEndOffset(nextOffsetMetadata.messageOffset)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 6e11a1e..26455f1 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1038,7 +1038,7 @@ class LogCleanerTest extends JUnitSuite {
     log.appendAsFollower(record1)
     val record2 = messageWithOffset("hello".getBytes, "hello".getBytes, 1)
     log.appendAsFollower(record2)
-    log.roll(Int.MaxValue/2) // starting a new log segment at offset Int.MaxValue/2
+    log.roll(Some(Int.MaxValue/2)) // starting a new log segment at offset Int.MaxValue/2
     val record3 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue/2)
     log.appendAsFollower(record3)
     val record4 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue.toLong + 1)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index e0dcdfd..6be9203 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -139,6 +139,52 @@ class LogTest {
     assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments)
   }
 
+  @Test
+  def testRollSegmentThatAlreadyExists() {
+    val logConfig = createLogConfig(segmentMs = 1 * 60 * 60L)
+
+    // create a log
+    val log = createLog(logDir, logConfig)
+    assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
+
+    // roll active segment with the same base offset of size zero should recreate the segment
+    log.roll(Some(0L))
+    assertEquals("Expect 1 segment after roll() empty segment with base offset.", 1, log.numberOfSegments)
+
+    // should be able to append records to active segment
+    val records = TestUtils.records(
+      List(new SimpleRecord(mockTime.milliseconds, "k1".getBytes, "v1".getBytes)),
+      baseOffset = 0L, partitionLeaderEpoch = 0)
+    log.appendAsFollower(records)
+    assertEquals("Expect one segment.", 1, log.numberOfSegments)
+    assertEquals(0L, log.activeSegment.baseOffset)
+
+    // make sure we can append more records
+    val records2 = TestUtils.records(
+      List(new SimpleRecord(mockTime.milliseconds + 10, "k2".getBytes, "v2".getBytes)),
+      baseOffset = 1L, partitionLeaderEpoch = 0)
+    log.appendAsFollower(records2)
+
+    assertEquals("Expect two records in the log.", 2, log.logEndOffset)
+    assertEquals(0, log.readUncommitted(0, 100, Some(1)).records.batches.iterator.next().lastOffset)
+    assertEquals(1, log.readUncommitted(1, 100, Some(2)).records.batches.iterator.next().lastOffset)
+
+    // roll so that active segment is empty
+    log.roll()
+    assertEquals("Expect base offset of active segment to be LEO", 2L, log.activeSegment.baseOffset)
+    assertEquals("Expect two segments.", 2, log.numberOfSegments)
+
+    // manually resize offset index to force roll of an empty active segment on next append
+    log.activeSegment.offsetIndex.resize(0)
+    val records3 = TestUtils.records(
+      List(new SimpleRecord(mockTime.milliseconds + 12, "k3".getBytes, "v3".getBytes)),
+      baseOffset = 2L, partitionLeaderEpoch = 0)
+    log.appendAsFollower(records3)
+    assertTrue(log.activeSegment.offsetIndex.maxEntries > 1)
+    assertEquals(2, log.readUncommitted(2, 100, Some(3)).records.batches.iterator.next().lastOffset)
+    assertEquals("Expect two segments.", 2, log.numberOfSegments)
+  }
+
   @Test(expected = classOf[OutOfOrderSequenceException])
   def testNonSequentialAppend(): Unit = {
     // create a log
@@ -789,17 +835,17 @@ class LogTest {
     val logConfig = createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0)
-    log.roll(1L)
+    log.roll(Some(1L))
     assertEquals(Some(1L), log.latestProducerSnapshotOffset)
     assertEquals(Some(1L), log.oldestProducerSnapshotOffset)
 
     log.appendAsLeader(TestUtils.singletonRecords("b".getBytes), leaderEpoch = 0)
-    log.roll(2L)
+    log.roll(Some(2L))
     assertEquals(Some(2L), log.latestProducerSnapshotOffset)
     assertEquals(Some(1L), log.oldestProducerSnapshotOffset)
 
     log.appendAsLeader(TestUtils.singletonRecords("c".getBytes), leaderEpoch = 0)
-    log.roll(3L)
+    log.roll(Some(3L))
     assertEquals(Some(3L), log.latestProducerSnapshotOffset)
 
     // roll triggers a flush at the starting offset of the new segment, we should retain all snapshots
@@ -1243,7 +1289,7 @@ class LogTest {
     val logConfig = createLogConfig()
     val log = createLog(logDir,  logConfig)
     log.closeHandlers()
-    log.roll(1)
+    log.roll(Some(1))
   }
 
   @Test