You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:02 UTC
[14/37] git commit: Fatal error during KafkaServerStable startup when
hard-failed broker is re-started; patched by Swapnil Ghike;
reviewed by Jun Rao and Jay Kreps; kafka-757
Fatal error during KafkaServerStable startup when hard-failed broker is re-started; patched by Swapnil Ghike; reviewed by Jun Rao and Jay Kreps; kafka-757
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e81b0a3d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e81b0a3d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e81b0a3d
Branch: refs/heads/trunk
Commit: e81b0a3ded9ff31f5089f2a3f294fbe0aef9614b
Parents: 48745f0
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Feb 13 14:01:57 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Feb 13 14:01:57 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 11 ++++++++++-
core/src/main/scala/kafka/log/OffsetIndex.scala | 18 ++++++++++++------
2 files changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e81b0a3d/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index eee0ed3..d0b26ab 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -192,7 +192,16 @@ private[kafka] class Log(val dir: File,
if(needsRecovery)
recoverSegment(logSegments.get(logSegments.size - 1))
}
- new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size)))
+
+ val segmentList = logSegments.toArray(new Array[LogSegment](logSegments.size))
+ // Check for the index file of every segment, if it's empty or its last offset is greater than its base offset.
+ for (s <- segmentList) {
+ require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset,
+ "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
+ .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset))
+ }
+
+ new SegmentList(segmentList)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/e81b0a3d/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 0d67242..e806da9 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -92,7 +92,6 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
info("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
.format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
- require(entries == 0 || lastOffset > this.baseOffset, "Corrupt index found, index file (%s) has non-zero size but last offset is %d.".format(file.getAbsolutePath, lastOffset))
/* the maximum number of entries this index can hold */
def maxEntries = mmap.limit / 8
@@ -130,7 +129,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
* Return -1 if the least entry in the index is larger than the target offset or the index is empty
*/
private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
- // we only store the difference from the baseoffset so calculate that
+ // we only store the difference from the base offset so calculate that
val relOffset = targetOffset - baseOffset
// check if the index is empty
@@ -197,7 +196,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
/**
* Truncate the entire index
*/
- def truncate() = truncateTo(this.baseOffset)
+ def truncate() = truncateToEntries(0)
/**
* Remove all entries from the index which have an offset greater than or equal to the given offset.
@@ -220,11 +219,18 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
slot
else
slot + 1
- this.size.set(newEntries)
- mmap.position(this.size.get * 8)
- this.lastOffset = readLastOffset
+ truncateToEntries(newEntries)
}
}
+
+ /**
+ * Truncates index to a known number of entries.
+ */
+ private def truncateToEntries(entries: Int) {
+ this.size.set(entries)
+ mmap.position(this.size.get * 8)
+ this.lastOffset = readLastOffset
+ }
/**
* Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from