You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/11/19 03:31:48 UTC
git commit: KAFKA-1112;
broker can not start itself after kafka is killed with -9;
patched by Jay Kreps and Jun Rao;
reviewed by Jay Kreps, Neha Narkhede and Guozhang Wang
Updated Branches:
refs/heads/trunk eedbea652 -> 7c54e39bd
KAFKA-1112; broker can not start itself after kafka is killed with -9; patched by Jay Kreps and Jun Rao; reviewed by Jay Kreps, Neha Narkhede and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7c54e39b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7c54e39b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7c54e39b
Branch: refs/heads/trunk
Commit: 7c54e39bd48c9908c220ee68cee608a0d0cf5d9d
Parents: eedbea6
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Nov 18 18:31:32 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Nov 18 18:31:32 2013 -0800
----------------------------------------------------------------------
.../main/scala/kafka/log/FileMessageSet.scala | 2 +
core/src/main/scala/kafka/log/Log.scala | 28 +++++++-------
core/src/main/scala/kafka/log/LogManager.scala | 2 +
core/src/main/scala/kafka/log/OffsetIndex.scala | 40 ++++++++++++--------
.../scala/unit/kafka/log/LogSegmentTest.scala | 5 +--
.../src/test/scala/unit/kafka/log/LogTest.scala | 22 +++++------
.../test/scala/unit/kafka/utils/TestUtils.scala | 10 ++++-
7 files changed, 62 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 6c099da..e1f8b97 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -123,6 +123,8 @@ class FileMessageSet private[kafka](@volatile var file: File,
if(offset >= targetOffset)
return OffsetPosition(offset, position)
val messageSize = buffer.getInt()
+ if(messageSize < Message.MessageOverhead)
+ throw new IllegalStateException("Invalid message size: " + messageSize)
position += MessageSet.LogOverhead + messageSize
}
null
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 9205128..1883a53 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -155,26 +155,19 @@ class Log(val dir: File,
activeSegment.index.resize(config.maxIndexSize)
}
- // sanity check the index file of every segment, if it's empty or its last offset is greater than its base offset.
- for (s <- logSegments) {
- require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset,
- "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
- .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset))
- }
+ // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment
+ for (s <- logSegments)
+ s.index.sanityCheck()
}
private def recoverLog() {
- val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L}
- val needsRecovery = !(new File(dir.getParentFile, CleanShutdownFile)).exists()
- if(!needsRecovery) {
- this.recoveryPoint = lastOffset
- return
- }
- if(lastOffset <= this.recoveryPoint) {
- info("Log '%s' is fully intact, skipping recovery".format(name))
- this.recoveryPoint = lastOffset
+ // if we have the clean shutdown marker, skip recovery
+ if(hasCleanShutdownFile) {
+ this.recoveryPoint = activeSegment.nextOffset
return
}
+
+ // okay we need to actually recover this log
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
while(unflushed.hasNext) {
val curr = unflushed.next
@@ -196,6 +189,11 @@ class Log(val dir: File,
}
}
}
+
+ /**
+ * Check if we have the "clean shutdown" file
+ */
+ private def hasCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists()
/**
* The number of segments in the log.
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 390b759..81be88a 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -175,6 +175,8 @@ class LogManager(val logDirs: Array[File],
allLogs.foreach(_.close())
// update the last flush point
checkpointRecoveryPointOffsets()
+ // mark that the shutdown was clean by creating the clean shutdown marker file
+ logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile()))
} finally {
// regardless of whether the close succeeded, we need to unlock the data directories
dirLocks.foreach(_.destroy())
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 2f4e303..96571b3 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -69,12 +69,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
raf.setLength(roundToExactMultiple(maxIndexSize, 8))
}
- val len = raf.length()
- if(len < 0 || len % 8 != 0)
- throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len +
- " bytes which is not positive or not a multiple of 8.")
-
/* memory-map the file */
+ val len = raf.length()
val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
/* set the position in the index for the next entry */
@@ -99,22 +95,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
var maxEntries = mmap.limit / 8
/* the last offset in the index */
- var lastOffset = readLastOffset()
+ var lastOffset = readLastEntry.offset
debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
.format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
/**
- * The last offset written to the index
+ * The last entry in the index
*/
- private def readLastOffset(): Long = {
+ def readLastEntry(): OffsetPosition = {
inLock(lock) {
- val offset =
- size.get match {
- case 0 => 0
- case s => relativeOffset(this.mmap, s-1)
- }
- baseOffset + offset
+ size.get match {
+ case 0 => OffsetPosition(baseOffset, 0)
+ case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1))
+ }
}
}
@@ -179,7 +173,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
/* return the nth offset relative to the base offset */
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
- /* return the nth physical offset */
+ /* return the nth physical position */
private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
/**
@@ -258,7 +252,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
inLock(lock) {
this.size.set(entries)
mmap.position(this.size.get * 8)
- this.lastOffset = readLastOffset
+ this.lastOffset = readLastEntry.offset
}
}
@@ -351,6 +345,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
}
/**
+ * Do a basic sanity check on this index to detect obvious problems
+ * @throw IllegalArgumentException if any problems are found
+ */
+ def sanityCheck() {
+ require(entries == 0 || lastOffset > baseOffset,
+ "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
+ .format(file.getAbsolutePath, lastOffset, baseOffset))
+ val len = file.length()
+ require(len % 8 == 0,
+ "Index file " + file.getAbsolutePath + " is corrupt, found " + len +
+ " bytes which is not positive or not a multiple of 8.")
+ }
+
+ /**
* Round a number to the greatest exact multiple of the given factor less than the given number.
* E.g. roundToExactMultiple(67, 8) == 64
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 5f2c2e8..6b76037 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -212,15 +212,14 @@ class LogSegmentTest extends JUnit3Suite {
*/
@Test
def testRecoveryWithCorruptMessage() {
- val rand = new Random(1)
val messagesAppended = 20
for(iteration <- 0 until 10) {
val seg = createSegment(0)
for(i <- 0 until messagesAppended)
seg.append(i, messages(i, i.toString))
- val offsetToBeginCorruption = rand.nextInt(messagesAppended)
+ val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
// start corrupting somewhere in the middle of the chosen record all the way to the end
- val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15)
+ val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15)
TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position)
seg.recover(64*1024)
assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList)
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1571f1e..1da1393 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -592,29 +592,29 @@ class LogTest extends JUnitSuite {
val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000)
val set = TestUtils.singleMessageSet("test".getBytes())
val recoveryPoint = 50L
- for(iteration <- 0 until 10) {
+ for(iteration <- 0 until 50) {
// create a log and write some messages to it
+ logDir.mkdirs()
var log = new Log(logDir,
config,
recoveryPoint = 0L,
time.scheduler,
time)
- for(i <- 0 until 100)
+ val numMessages = 50 + TestUtils.random.nextInt(50)
+ for(i <- 0 until numMessages)
log.append(set)
- val seg = log.logSegments(0, recoveryPoint).last
- val index = seg.index
- val messages = seg.log
- val filePosition = messages.searchFor(recoveryPoint, 0).position
- val indexPosition = index.lookup(recoveryPoint).position
+ val messages = log.logSegments.flatMap(_.log.iterator.toList)
log.close()
- // corrupt file
- TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition)
- TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition)
+ // corrupt index and log by appending random bytes
+ TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1)
+ TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
// attempt recovery
log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
- assertEquals(recoveryPoint, log.logEndOffset)
+ assertEquals(numMessages, log.logEndOffset)
+ assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList))
+ Utils.rm(logDir)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 777b315..d88b6c3 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -518,9 +518,15 @@ object TestUtils extends Logging {
def writeNonsenseToFile(fileName: File, position: Long, size: Int) {
val file = new RandomAccessFile(fileName, "rw")
file.seek(position)
- val rand = new Random
for(i <- 0 until size)
- file.writeByte(rand.nextInt(255))
+ file.writeByte(random.nextInt(255))
+ file.close()
+ }
+
+ def appendNonsenseToFile(fileName: File, size: Int) {
+ val file = new FileOutputStream(fileName, true)
+ for(i <- 0 until size)
+ file.write(random.nextInt(255))
file.close()
}