You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/11/19 03:31:48 UTC

git commit: KAFKA-1112; broker can not start itself after kafka is killed with -9; patched by Jay Kreps and Jun Rao; reviewed by Jay Kreps, Neha Narkhede and Guozhang Wang

Updated Branches:
  refs/heads/trunk eedbea652 -> 7c54e39bd


KAFKA-1112; broker can not start itself after kafka is killed with -9; patched by Jay Kreps and Jun Rao; reviewed by Jay Kreps, Neha Narkhede and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7c54e39b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7c54e39b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7c54e39b

Branch: refs/heads/trunk
Commit: 7c54e39bd48c9908c220ee68cee608a0d0cf5d9d
Parents: eedbea6
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Nov 18 18:31:32 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Nov 18 18:31:32 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/log/FileMessageSet.scala   |  2 +
 core/src/main/scala/kafka/log/Log.scala         | 28 +++++++-------
 core/src/main/scala/kafka/log/LogManager.scala  |  2 +
 core/src/main/scala/kafka/log/OffsetIndex.scala | 40 ++++++++++++--------
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  5 +--
 .../src/test/scala/unit/kafka/log/LogTest.scala | 22 +++++------
 .../test/scala/unit/kafka/utils/TestUtils.scala | 10 ++++-
 7 files changed, 62 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 6c099da..e1f8b97 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -123,6 +123,8 @@ class FileMessageSet private[kafka](@volatile var file: File,
       if(offset >= targetOffset)
         return OffsetPosition(offset, position)
       val messageSize = buffer.getInt()
+      if(messageSize < Message.MessageOverhead)
+        throw new IllegalStateException("Invalid message size: " + messageSize)
       position += MessageSet.LogOverhead + messageSize
     }
     null

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 9205128..1883a53 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -155,26 +155,19 @@ class Log(val dir: File,
       activeSegment.index.resize(config.maxIndexSize)
     }
 
-    // sanity check the index file of every segment, if it's empty or its last offset is greater than its base offset.
-    for (s <- logSegments) {
-      require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset,
-              "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
-              .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset))
-    }
+    // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment
+    for (s <- logSegments)
+      s.index.sanityCheck()
   }
   
   private def recoverLog() {
-    val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L}
-    val needsRecovery = !(new File(dir.getParentFile, CleanShutdownFile)).exists()
-    if(!needsRecovery) {
-      this.recoveryPoint = lastOffset
-      return
-    }
-    if(lastOffset <= this.recoveryPoint) {
-      info("Log '%s' is fully intact, skipping recovery".format(name))
-      this.recoveryPoint = lastOffset
+    // if we have the clean shutdown marker, skip recovery
+    if(hasCleanShutdownFile) {
+      this.recoveryPoint = activeSegment.nextOffset
       return
     }
+
+    // okay we need to actually recover this log
     val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
     while(unflushed.hasNext) {
       val curr = unflushed.next
@@ -196,6 +189,11 @@ class Log(val dir: File,
       }
     }
   }
+  
+  /**
+   * Check if we have the "clean shutdown" file
+   */
+  private def hasCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists()
 
   /**
    * The number of segments in the log.

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 390b759..81be88a 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -175,6 +175,8 @@ class LogManager(val logDirs: Array[File],
       allLogs.foreach(_.close())
       // update the last flush point
       checkpointRecoveryPointOffsets()
+      // mark that the shutdown was clean by creating the clean shutdown marker file
+      logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile()))
     } finally {
       // regardless of whether the close succeeded, we need to unlock the data directories
       dirLocks.foreach(_.destroy())

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 2f4e303..96571b3 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -69,12 +69,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
           raf.setLength(roundToExactMultiple(maxIndexSize, 8))
         }
           
-        val len = raf.length()  
-        if(len < 0 || len % 8 != 0)
-          throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + 
-                                          " bytes which is not positive or not a multiple of 8.")
-          
         /* memory-map the file */
+        val len = raf.length()
         val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
           
         /* set the position in the index for the next entry */
@@ -99,22 +95,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
   var maxEntries = mmap.limit / 8
   
   /* the last offset in the index */
-  var lastOffset = readLastOffset()
+  var lastOffset = readLastEntry.offset
   
   debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
     .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
 
   /**
-   * The last offset written to the index
+   * The last entry in the index
    */
-  private def readLastOffset(): Long = {
+  def readLastEntry(): OffsetPosition = {
     inLock(lock) {
-      val offset = 
-        size.get match {
-          case 0 => 0
-          case s => relativeOffset(this.mmap, s-1)
-        }
-      baseOffset + offset
+      size.get match {
+        case 0 => OffsetPosition(baseOffset, 0)
+        case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1))
+      }
     }
   }
 
@@ -179,7 +173,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
   /* return the nth offset relative to the base offset */
   private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
   
-  /* return the nth physical offset */
+  /* return the nth physical position */
   private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
   
   /**
@@ -258,7 +252,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
     inLock(lock) {
       this.size.set(entries)
       mmap.position(this.size.get * 8)
-      this.lastOffset = readLastOffset
+      this.lastOffset = readLastEntry.offset
     }
   }
   
@@ -351,6 +345,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
   }
   
   /**
+   * Do a basic sanity check on this index to detect obvious problems
+   * @throw IllegalArgumentException if any problems are found
+   */
+  def sanityCheck() {
+    require(entries == 0 || lastOffset > baseOffset,
+            "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
+            .format(file.getAbsolutePath, lastOffset, baseOffset))
+      val len = file.length()
+      require(len % 8 == 0, 
+              "Index file " + file.getAbsolutePath + " is corrupt, found " + len +
+              " bytes which is not positive or not a multiple of 8.")
+  }
+  
+  /**
    * Round a number to the greatest exact multiple of the given factor less than the given number.
    * E.g. roundToExactMultiple(67, 8) == 64
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 5f2c2e8..6b76037 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -212,15 +212,14 @@ class LogSegmentTest extends JUnit3Suite {
    */
   @Test
   def testRecoveryWithCorruptMessage() {
-    val rand = new Random(1)
     val messagesAppended = 20
     for(iteration <- 0 until 10) {
       val seg = createSegment(0)
       for(i <- 0 until messagesAppended)
         seg.append(i, messages(i, i.toString))
-      val offsetToBeginCorruption = rand.nextInt(messagesAppended)
+      val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
       // start corrupting somewhere in the middle of the chosen record all the way to the end
-      val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15)
+      val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15)
       TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position)
       seg.recover(64*1024)
       assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1571f1e..1da1393 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -592,29 +592,29 @@ class LogTest extends JUnitSuite {
     val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000)
     val set = TestUtils.singleMessageSet("test".getBytes())
     val recoveryPoint = 50L
-    for(iteration <- 0 until 10) {
+    for(iteration <- 0 until 50) {
       // create a log and write some messages to it
+      logDir.mkdirs()
       var log = new Log(logDir,
                         config,
                         recoveryPoint = 0L,
                         time.scheduler,
                         time)
-      for(i <- 0 until 100)
+      val numMessages = 50 + TestUtils.random.nextInt(50)
+      for(i <- 0 until numMessages)
         log.append(set)
-      val seg = log.logSegments(0, recoveryPoint).last
-      val index = seg.index
-      val messages = seg.log
-      val filePosition = messages.searchFor(recoveryPoint, 0).position
-      val indexPosition = index.lookup(recoveryPoint).position
+      val messages = log.logSegments.flatMap(_.log.iterator.toList)
       log.close()
       
-      // corrupt file
-      TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition)
-      TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition)
+      // corrupt index and log by appending random bytes
+      TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1)
+      TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
       
       // attempt recovery
       log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
-      assertEquals(recoveryPoint, log.logEndOffset)
+      assertEquals(numMessages, log.logEndOffset)
+      assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList))
+      Utils.rm(logDir)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c54e39b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 777b315..d88b6c3 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -518,9 +518,15 @@ object TestUtils extends Logging {
   def writeNonsenseToFile(fileName: File, position: Long, size: Int) {
     val file = new RandomAccessFile(fileName, "rw")
     file.seek(position)
-    val rand = new Random
     for(i <- 0 until size)
-      file.writeByte(rand.nextInt(255))
+      file.writeByte(random.nextInt(255))
+    file.close()
+  }
+  
+  def appendNonsenseToFile(fileName: File, size: Int) {
+    val file = new FileOutputStream(fileName, true)
+    for(i <- 0 until size)
+      file.write(random.nextInt(255))
     file.close()
   }