You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:02 UTC

[14/37] git commit: Fatal error during KafkaServerStable startup when hard-failed broker is re-started; patched by Swapnil Ghike; reviewed by Jun Rao and Jay Kreps; kafka-757

Fatal error during KafkaServerStable startup when hard-failed broker is re-started; patched by Swapnil Ghike; reviewed by Jun Rao and Jay Kreps; kafka-757


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e81b0a3d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e81b0a3d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e81b0a3d

Branch: refs/heads/trunk
Commit: e81b0a3ded9ff31f5089f2a3f294fbe0aef9614b
Parents: 48745f0
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Feb 13 14:01:57 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Feb 13 14:01:57 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |   11 ++++++++++-
 core/src/main/scala/kafka/log/OffsetIndex.scala |   18 ++++++++++++------
 2 files changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e81b0a3d/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index eee0ed3..d0b26ab 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -192,7 +192,16 @@ private[kafka] class Log(val dir: File,
       if(needsRecovery)
         recoverSegment(logSegments.get(logSegments.size - 1))
     }
-    new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size)))
+
+    val segmentList = logSegments.toArray(new Array[LogSegment](logSegments.size))
+    // Check for the index file of every segment, if it's empty or its last offset is greater than its base offset.
+    for (s <- segmentList) {
+      require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset,
+              "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
+                .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset))
+    }
+
+    new SegmentList(segmentList)
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e81b0a3d/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 0d67242..e806da9 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -92,7 +92,6 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
   
   info("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
     .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
-  require(entries == 0 || lastOffset > this.baseOffset, "Corrupt index found, index file (%s) has non-zero size but last offset is %d.".format(file.getAbsolutePath, lastOffset))
 
   /* the maximum number of entries this index can hold */
   def maxEntries = mmap.limit / 8
@@ -130,7 +129,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
    * Return -1 if the least entry in the index is larger than the target offset or the index is empty
    */
   private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
-    // we only store the difference from the baseoffset so calculate that
+    // we only store the difference from the base offset so calculate that
     val relOffset = targetOffset - baseOffset
     
     // check if the index is empty
@@ -197,7 +196,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
   /**
    * Truncate the entire index
    */
-  def truncate() = truncateTo(this.baseOffset)
+  def truncate() = truncateToEntries(0)
   
   /**
    * Remove all entries from the index which have an offset greater than or equal to the given offset.
@@ -220,11 +219,18 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
           slot
         else
           slot + 1
-      this.size.set(newEntries)
-      mmap.position(this.size.get * 8)
-      this.lastOffset = readLastOffset
+      truncateToEntries(newEntries)
     }
   }
+
+  /**
+   * Truncates index to a known number of entries.
+   */
+  private def truncateToEntries(entries: Int) {
+    this.size.set(entries)
+    mmap.position(this.size.get * 8)
+    this.lastOffset = readLastOffset
+  }
   
   /**
    * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from