You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/04 21:08:30 UTC

[kafka] branch trunk updated: KAFKA-8306; Initialize log end offset accurately when start offset is non-zero (#6652)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 56b92a5  KAFKA-8306; Initialize log end offset accurately when start offset is non-zero (#6652)
56b92a5 is described below

commit 56b92a550454765b09047be9ad561f3395c614e4
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Sat May 4 14:08:16 2019 -0700

    KAFKA-8306; Initialize log end offset accurately when start offset is non-zero (#6652)
    
    This patch ensures that the log end offset of each partition is initialized consistently with the checkpointed log start offset.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/log/Log.scala          | 37 ++++++++----
 core/src/test/scala/unit/kafka/log/LogTest.scala | 74 ++++++++++++++++++------
 2 files changed, 82 insertions(+), 29 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index df71560..4e1e3c7 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -570,17 +570,7 @@ class Log(@volatile var dir: File,
     // before the swap file is restored as the new segment file.
     completeSwapOperations(swapFiles)
 
-    if (logSegments.isEmpty) {
-      // no existing segments, create a new mutable segment beginning at offset 0
-      addSegment(LogSegment.open(dir = dir,
-        baseOffset = 0,
-        config,
-        time = time,
-        fileAlreadyExists = false,
-        initFileSize = this.initFileSize,
-        preallocate = config.preallocate))
-      0
-    } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
+    if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
       val nextOffset = retryOnOffsetOverflow {
         recoverLog()
       }
@@ -588,7 +578,9 @@ class Log(@volatile var dir: File,
       // reset the index size of the currently active log segment to allow more entries
       activeSegment.resizeIndexes(config.maxIndexSize)
       nextOffset
-    } else 0
+    } else {
+      0
+    }
   }
 
   private def updateLogEndOffset(messageOffset: Long) {
@@ -626,6 +618,27 @@ class Log(@volatile var dir: File,
         }
       }
     }
+
+    if (logSegments.nonEmpty) {
+      val logEndOffset = activeSegment.readNextOffset
+      if (logEndOffset < logStartOffset) {
+        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
+          "This could happen if segment files were deleted from the file system.")
+        logSegments.foreach(deleteSegment)
+      }
+    }
+
+    if (logSegments.isEmpty) {
+      // no existing segments, create a new mutable segment beginning at logStartOffset
+      addSegment(LogSegment.open(dir = dir,
+        baseOffset = logStartOffset,
+        config,
+        time = time,
+        fileAlreadyExists = false,
+        initFileSize = this.initFileSize,
+        preallocate = config.preallocate))
+    }
+
     recoveryPoint = activeSegment.readNextOffset
     recoveryPoint
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 3d6fb0d..8635969 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -43,9 +43,9 @@ import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.scalatest.Assertions
 
-import scala.collection.Iterable
+import scala.collection.{Iterable, mutable}
 import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.mutable.ListBuffer
 import org.scalatest.Assertions.{assertThrows, intercept, withClue}
 
 class LogTest {
@@ -278,6 +278,47 @@ class LogTest {
     testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.latestVersion.version)
   }
 
+  @Test
+  def testLogReinitializeAfterManualDelete(): Unit = {
+    val logConfig = LogTest.createLogConfig()
+    // simulate a case where log data does not exist but the start offset is non-zero
+    val log = createLog(logDir, logConfig, logStartOffset = 500)
+    assertEquals(500, log.logStartOffset)
+    assertEquals(500, log.logEndOffset)
+  }
+
+  @Test
+  def testLogEndLessThanStartAfterReopen(): Unit = {
+    val logConfig = LogTest.createLogConfig()
+    var log = createLog(logDir, logConfig)
+    for (i <- 0 until 5) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+    assertEquals(6, log.logSegments.size)
+
+    // Increment the log start offset
+    val startOffset = 4
+    log.maybeIncrementLogStartOffset(startOffset)
+    assertTrue(log.logEndOffset > log.logStartOffset)
+
+    // Append garbage to a segment below the current log start offset
+    val segmentToForceTruncation = log.logSegments.take(2).last
+    val bw = new BufferedWriter(new FileWriter(segmentToForceTruncation.log.file))
+    bw.write("corruptRecord")
+    bw.close()
+    log.close()
+
+    // Reopen the log. This will cause truncate the segment to which we appended garbage and delete all other segments.
+    // All remaining segments will be lower than the current log start offset, which will force deletion of all segments
+    // and recreation of a single, active segment starting at logStartOffset.
+    log = createLog(logDir, logConfig, logStartOffset = startOffset)
+    assertEquals(1, log.logSegments.size)
+    assertEquals(startOffset, log.logStartOffset)
+    assertEquals(startOffset, log.logEndOffset)
+  }
+
   private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10, messageFormatVersion = messageFormatVersion)
     var log = createLog(logDir, logConfig)
@@ -296,21 +337,21 @@ class LogTest {
     // 1 segment. We collect the data before closing the log.
     val offsetForSegmentAfterRecoveryPoint = segmentOffsets(segmentOffsets.size - 3)
     val offsetForRecoveryPointSegment = segmentOffsets(segmentOffsets.size - 4)
-    val (segOffsetsBeforeRecovery, segOffsetsAfterRecovery) = segmentOffsets.partition(_ < offsetForRecoveryPointSegment)
+    val (segOffsetsBeforeRecovery, segOffsetsAfterRecovery) = segmentOffsets.toSet.partition(_ < offsetForRecoveryPointSegment)
     val recoveryPoint = offsetForRecoveryPointSegment + 1
     assertTrue(recoveryPoint < offsetForSegmentAfterRecoveryPoint)
     log.close()
 
-    val segmentsWithReads = ArrayBuffer[LogSegment]()
-    val recoveredSegments = ArrayBuffer[LogSegment]()
-    val expectedSegmentsWithReads = ArrayBuffer[Long]()
-    val expectedSnapshotOffsets = ArrayBuffer[Long]()
+    val segmentsWithReads = mutable.Set[LogSegment]()
+    val recoveredSegments = mutable.Set[LogSegment]()
+    val expectedSegmentsWithReads = mutable.Set[Long]()
+    val expectedSnapshotOffsets = mutable.Set[Long]()
 
     if (logConfig.messageFormatVersion < KAFKA_0_11_0_IV0) {
       expectedSegmentsWithReads += activeSegmentOffset
       expectedSnapshotOffsets ++= log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset
     } else {
-      expectedSegmentsWithReads ++= segOffsetsBeforeRecovery ++ Seq(activeSegmentOffset)
+      expectedSegmentsWithReads ++= segOffsetsBeforeRecovery ++ Set(activeSegmentOffset)
       expectedSnapshotOffsets ++= log.logSegments.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset
     }
 
@@ -351,7 +392,7 @@ class LogTest {
     // We will reload all segments because the recovery point is behind the producer snapshot files (pre KAFKA-5829 behaviour)
     assertEquals(expectedSegmentsWithReads, segmentsWithReads.map(_.baseOffset))
     assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset))
-    assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
+    assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets.toSet)
     log.close()
     segmentsWithReads.clear()
     recoveredSegments.clear()
@@ -360,9 +401,9 @@ class LogTest {
     // avoid reading all segments
     ProducerStateManager.deleteSnapshotsBefore(logDir, offsetForRecoveryPointSegment)
     log = createLogWithInterceptedReads(recoveryPoint = recoveryPoint)
-    assertEquals(Seq(activeSegmentOffset), segmentsWithReads.map(_.baseOffset))
+    assertEquals(Set(activeSegmentOffset), segmentsWithReads.map(_.baseOffset))
     assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset))
-    assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
+    assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets.toSet)
 
     // Verify that we keep 2 snapshot files if we checkpoint the log end offset
     log.deleteSnapshotsAfterRecoveryPointCheckpoint()
@@ -2936,18 +2977,17 @@ class LogTest {
   @Test
   def shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete(): Unit = {
     def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L)
-    val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
-
-    // Create log with start offset ahead of the first log segment
-    val log = createLog(logDir, logConfig, brokerTopicStats, logStartOffset = 5L)
+    val recordsPerSegment = 5
+    val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * recordsPerSegment, retentionMs = 10000, cleanupPolicy = "compact")
+    val log = createLog(logDir, logConfig, brokerTopicStats)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    // Three segments should be created, with the first one entirely preceding the log start offset
+    // Three segments should be created
     assertEquals(3, log.logSegments.count(_ => true))
-    assertTrue(log.logSegments.slice(1, 2).head.baseOffset <= log.logStartOffset)
+    log.maybeIncrementLogStartOffset(recordsPerSegment)
 
     // The first segment, which is entirely before the log start offset, should be deleted
     // Of the remaining the segments, the first can overlap the log start offset and the rest must have a base offset