You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/04 21:56:11 UTC
[kafka] branch 2.0 updated: KAFKA-8306;
Initialize log end offset accurately when start offset is non-zero
(#6652)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 1f902a5 KAFKA-8306; Initialize log end offset accurately when start offset is non-zero (#6652)
1f902a5 is described below
commit 1f902a52fb935240f0110186c2362aa4f5a30941
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Sat May 4 14:08:16 2019 -0700
KAFKA-8306; Initialize log end offset accurately when start offset is non-zero (#6652)
This patch ensures that the log end offset of each partition is initialized consistently with the checkpointed log start offset.
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
core/src/main/scala/kafka/log/Log.scala | 37 +++++++++-----
core/src/test/scala/unit/kafka/log/LogTest.scala | 63 +++++++++++++++++++-----
2 files changed, 77 insertions(+), 23 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 22d1afb..49f18a2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -517,17 +517,7 @@ class Log(@volatile var dir: File,
// before the swap file is restored as the new segment file.
completeSwapOperations(swapFiles)
- if (logSegments.isEmpty) {
- // no existing segments, create a new mutable segment beginning at offset 0
- addSegment(LogSegment.open(dir = dir,
- baseOffset = 0,
- config,
- time = time,
- fileAlreadyExists = false,
- initFileSize = this.initFileSize,
- preallocate = config.preallocate))
- 0
- } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
+ if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
val nextOffset = retryOnOffsetOverflow {
recoverLog()
}
@@ -535,7 +525,9 @@ class Log(@volatile var dir: File,
// reset the index size of the currently active log segment to allow more entries
activeSegment.resizeIndexes(config.maxIndexSize)
nextOffset
- } else 0
+ } else {
+ 0
+ }
}
private def updateLogEndOffset(messageOffset: Long) {
@@ -573,6 +565,27 @@ class Log(@volatile var dir: File,
}
}
}
+
+ if (logSegments.nonEmpty) {
+ val logEndOffset = activeSegment.readNextOffset
+ if (logEndOffset < logStartOffset) {
+ warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
+ "This could happen if segment files were deleted from the file system.")
+ logSegments.foreach(deleteSegment)
+ }
+ }
+
+ if (logSegments.isEmpty) {
+ // no existing segments, create a new mutable segment beginning at logStartOffset
+ addSegment(LogSegment.open(dir = dir,
+ baseOffset = logStartOffset,
+ config,
+ time = time,
+ fileAlreadyExists = false,
+ initFileSize = this.initFileSize,
+ preallocate = config.preallocate))
+ }
+
recoveryPoint = activeSegment.readNextOffset
recoveryPoint
}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 9a40bdb..3a9bdf6 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -43,9 +43,9 @@ import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.Assertions
-import scala.collection.Iterable
+import scala.collection.{Iterable, mutable}
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.mutable.ListBuffer
import org.scalatest.Assertions.{assertThrows, intercept, withClue}
class LogTest {
@@ -278,6 +278,47 @@ class LogTest {
testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.latestVersion.version)
}
+ @Test
+ def testLogReinitializeAfterManualDelete(): Unit = {
+ val logConfig = LogTest.createLogConfig()
+ // simulate a case where log data does not exist but the start offset is non-zero
+ val log = createLog(logDir, logConfig, logStartOffset = 500)
+ assertEquals(500, log.logStartOffset)
+ assertEquals(500, log.logEndOffset)
+ }
+
+ @Test
+ def testLogEndLessThanStartAfterReopen(): Unit = {
+ val logConfig = LogTest.createLogConfig()
+ var log = createLog(logDir, logConfig)
+ for (i <- 0 until 5) {
+ val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+ log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+ log.roll()
+ }
+ assertEquals(6, log.logSegments.size)
+
+ // Increment the log start offset
+ val startOffset = 4
+ log.maybeIncrementLogStartOffset(startOffset)
+ assertTrue(log.logEndOffset > log.logStartOffset)
+
+ // Append garbage to a segment below the current log start offset
+ val segmentToForceTruncation = log.logSegments.take(2).last
+ val bw = new BufferedWriter(new FileWriter(segmentToForceTruncation.log.file))
+ bw.write("corruptRecord")
+ bw.close()
+ log.close()
+
+ // Reopen the log. This will cause truncate the segment to which we appended garbage and delete all other segments.
+ // All remaining segments will be lower than the current log start offset, which will force deletion of all segments
+ // and recreation of a single, active segment starting at logStartOffset.
+ log = createLog(logDir, logConfig, logStartOffset = startOffset)
+ assertEquals(1, log.logSegments.size)
+ assertEquals(startOffset, log.logStartOffset)
+ assertEquals(startOffset, log.logEndOffset)
+ }
+
private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10, messageFormatVersion = messageFormatVersion)
var log = createLog(logDir, logConfig)
@@ -296,21 +337,21 @@ class LogTest {
// 1 segment. We collect the data before closing the log.
val offsetForSegmentAfterRecoveryPoint = segmentOffsets(segmentOffsets.size - 3)
val offsetForRecoveryPointSegment = segmentOffsets(segmentOffsets.size - 4)
- val (segOffsetsBeforeRecovery, segOffsetsAfterRecovery) = segmentOffsets.partition(_ < offsetForRecoveryPointSegment)
+ val (segOffsetsBeforeRecovery, segOffsetsAfterRecovery) = segmentOffsets.toSet.partition(_ < offsetForRecoveryPointSegment)
val recoveryPoint = offsetForRecoveryPointSegment + 1
assertTrue(recoveryPoint < offsetForSegmentAfterRecoveryPoint)
log.close()
- val segmentsWithReads = ArrayBuffer[LogSegment]()
- val recoveredSegments = ArrayBuffer[LogSegment]()
- val expectedSegmentsWithReads = ArrayBuffer[Long]()
- val expectedSnapshotOffsets = ArrayBuffer[Long]()
+ val segmentsWithReads = mutable.Set[LogSegment]()
+ val recoveredSegments = mutable.Set[LogSegment]()
+ val expectedSegmentsWithReads = mutable.Set[Long]()
+ val expectedSnapshotOffsets = mutable.Set[Long]()
if (logConfig.messageFormatVersion < KAFKA_0_11_0_IV0) {
expectedSegmentsWithReads += activeSegmentOffset
expectedSnapshotOffsets ++= log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset
} else {
- expectedSegmentsWithReads ++= segOffsetsBeforeRecovery ++ Seq(activeSegmentOffset)
+ expectedSegmentsWithReads ++= segOffsetsBeforeRecovery ++ Set(activeSegmentOffset)
expectedSnapshotOffsets ++= log.logSegments.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset
}
@@ -351,7 +392,7 @@ class LogTest {
// We will reload all segments because the recovery point is behind the producer snapshot files (pre KAFKA-5829 behaviour)
assertEquals(expectedSegmentsWithReads, segmentsWithReads.map(_.baseOffset))
assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset))
- assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
+ assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets.toSet)
log.close()
segmentsWithReads.clear()
recoveredSegments.clear()
@@ -360,9 +401,9 @@ class LogTest {
// avoid reading all segments
ProducerStateManager.deleteSnapshotsBefore(logDir, offsetForRecoveryPointSegment)
log = createLogWithInterceptedReads(recoveryPoint = recoveryPoint)
- assertEquals(Seq(activeSegmentOffset), segmentsWithReads.map(_.baseOffset))
+ assertEquals(Set(activeSegmentOffset), segmentsWithReads.map(_.baseOffset))
assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset))
- assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
+ assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets.toSet)
// Verify that we keep 2 snapshot files if we checkpoint the log end offset
log.deleteSnapshotsAfterRecoveryPointCheckpoint()