You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/11/06 00:37:24 UTC

svn commit: r1406009 - /incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala

Author: junrao
Date: Mon Nov  5 23:37:24 2012
New Revision: 1406009

URL: http://svn.apache.org/viewvc?rev=1406009&view=rev
Log:
extend DumpLogSegments to verify consistency btw data and index; patched by Yang Ye; reviewed by Jun Rao; KAFKA-577

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala?rev=1406009&r1=1406008&r2=1406009&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala Mon Nov  5 23:37:24 2012
@@ -21,46 +21,116 @@ import java.io._
 import kafka.message._
 import kafka.log._
 import kafka.utils._
+import collection.mutable
+import joptsimple.OptionParser
+
 
 object DumpLogSegments {
 
   def main(args: Array[String]) {
-    val print = args.contains("--print")
-    val files = args.filter(_ != "--print")
+    val parser = new OptionParser
+    val printOpt = parser.accepts("print-data-log", "if set, printing the messages content when dumping data logs")
+                           .withOptionalArg
+                           .describedAs("print data log content")
+                           .ofType(classOf[java.lang.Boolean])
+                           .defaultsTo(false)
+
+    val verifyOpt = parser.accepts("verify-index-only", "if set, just verify the index log without printing its content")
+                           .withOptionalArg
+                           .describedAs("just verify the index log")
+                           .ofType(classOf[java.lang.Boolean])
+                           .defaultsTo(false)
+
+    val filesOpt = parser.accepts("files", "REQUIRED: The comma separated list of data and index log files to be dumped")
+                           .withRequiredArg
+                           .describedAs("file1, file2, ...")
+                           .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+    if(!options.has(filesOpt)) {
+      System.err.println("Missing required argument \"" + filesOpt + "\"")
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+
+    val print = if(options.has(printOpt)) true else false
+    val verifyOnly = if(options.has(verifyOpt)) true else false
+    val files = options.valueOf(filesOpt).split(",")
+
+    val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Int, Int)]]
+    val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Int, Int)]]
 
     for(arg <- files) {
       val file = new File(arg)
       if(file.getName.endsWith(Log.LogFileSuffix)) {
         println("Dumping " + file)
-        dumpLog(file, print)
+        dumpLog(file, print, nonConsecutivePairsForLogFilesMap)
       } else if(file.getName.endsWith(Log.IndexFileSuffix)) {
         println("Dumping " + file)
-        dumpIndex(file)
+        dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap)
+      }
+    }
+    misMatchesForIndexFilesMap.foreach {
+      case (fileName, listOfMismatches) => {
+        System.err.println("Mismatches in :" + fileName)
+        listOfMismatches.foreach(m => {
+          System.err.println("  Index position: %d, log position: %d".format(m._1, m._2))
+        })
+      }
+    }
+    nonConsecutivePairsForLogFilesMap.foreach {
+      case (fileName, listOfNonSecutivePairs) => {
+        System.err.println("Non-secutive offsets in :" + fileName)
+        listOfNonSecutivePairs.foreach(m => {
+          System.err.println("  %d is followed by %d".format(m._1, m._2))
+        })
       }
     }
   }
   
   /* print out the contents of the index */
-  def dumpIndex(file: File) {
+  private def dumpIndex(file: File, verifyOnly: Boolean, misMatchesForIndexFilesMap: mutable.HashMap[String, List[(Int, Int)]]) {
     val startOffset = file.getName().split("\\.")(0).toLong
+    val logFileName = file.getAbsolutePath.split("\\.")(0) + Log.LogFileSuffix
+    val logFile = new File(logFileName)
+    val messageSet = new FileMessageSet(logFile)
     val index = new OffsetIndex(file = file, baseOffset = startOffset)
     for(i <- 0 until index.entries) {
       val entry = index.entry(i)
+      val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, messageSet.sizeInBytes())
+      val messageAndOffset = partialFileMessageSet.head
+      if(messageAndOffset.offset != entry.offset + index.baseOffset) {
+        var misMatchesSeq = misMatchesForIndexFilesMap.getOrElse(file.getName, List[(Int, Int)]())
+        misMatchesSeq ::=((entry.offset + index.baseOffset, messageAndOffset.offset).asInstanceOf[(Int, Int)])
+        misMatchesForIndexFilesMap.put(file.getName, misMatchesSeq)
+      }
       // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one
       if(entry.offset == 0 && i > 0)
         return
-      println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position))
+      if (!verifyOnly)
+        println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position))
     }
   }
   
   /* print out the contents of the log */
-  def dumpLog(file: File, printContents: Boolean) {
+  private def dumpLog(file: File, printContents: Boolean, nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Int, Int)]]) {
     val startOffset = file.getName().split("\\.")(0).toLong
     println("Starting offset: " + startOffset)
     val messageSet = new FileMessageSet(file)
     var validBytes = 0L
+    var lastOffset = -1l
     for(messageAndOffset <- messageSet) {
       val msg = messageAndOffset.message
+
+      if(lastOffset == -1)
+        lastOffset = messageAndOffset.offset
+      else if (messageAndOffset.offset != lastOffset +1) {
+        var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, List[(Int, Int)]())
+        nonConsecutivePairsSeq ::=((lastOffset, messageAndOffset.offset).asInstanceOf[(Int, Int)])
+        nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq)
+      }
+      lastOffset = messageAndOffset.offset
+
       print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid +
             " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
             " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
@@ -78,5 +148,4 @@ object DumpLogSegments {
     if(trailingBytes > 0)
       println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName))
   }
-  
 }