You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/20 00:33:06 UTC

git commit: Support deep iteration in DumpLogSegments tool; patched by Jun Rao; reviewed by Neha Narkhede; kafka-812

Updated Branches:
  refs/heads/0.8 46ebdc16e -> a376f9221


Support deep iteration in DumpLogSegments tool; patched by Jun Rao; reviewed by Neha Narkhede; kafka-812


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a376f922
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a376f922
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a376f922

Branch: refs/heads/0.8
Commit: a376f922149330995f91d427d76ff1595fbd26ce
Parents: 46ebdc1
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Mar 19 16:32:59 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Mar 19 16:32:59 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/DumpLogSegments.scala   |   98 +++++++++++----
 1 files changed, 71 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a376f922/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 1bed554..06e6437 100644
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -35,6 +35,12 @@ object DumpLogSegments {
                            .withRequiredArg
                            .describedAs("file1, file2, ...")
                            .ofType(classOf[String])
+    val maxMessageSizeOpt = parser.accepts("max-message-size", "Size of largest message.")
+                                  .withRequiredArg
+                                  .describedAs("size")
+                                  .ofType(classOf[java.lang.Integer])
+                                  .defaultsTo(5 * 1024 * 1024)
+    val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration")
 
     val options = parser.parse(args : _*)
     if(!options.has(filesOpt)) {
@@ -46,6 +52,8 @@ object DumpLogSegments {
     val print = if(options.has(printOpt)) true else false
     val verifyOnly = if(options.has(verifyOpt)) true else false
     val files = options.valueOf(filesOpt).split(",")
+    val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
+    val isDeepIteration = if(options.has(deepIterationOpt)) true else false
 
     val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
     val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
@@ -54,17 +62,17 @@ object DumpLogSegments {
       val file = new File(arg)
       if(file.getName.endsWith(Log.LogFileSuffix)) {
         println("Dumping " + file)
-        dumpLog(file, print, nonConsecutivePairsForLogFilesMap)
+        dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration)
       } else if(file.getName.endsWith(Log.IndexFileSuffix)) {
         println("Dumping " + file)
-        dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap)
+        dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
       }
     }
     misMatchesForIndexFilesMap.foreach {
       case (fileName, listOfMismatches) => {
         System.err.println("Mismatches in :" + fileName)
         listOfMismatches.foreach(m => {
-          System.err.println("  Index position: %d, log position: %d".format(m._1, m._2))
+          System.err.println("  Index offset: %d, log offset: %d".format(m._1, m._2))
         })
       }
     }
@@ -79,7 +87,10 @@ object DumpLogSegments {
   }
   
   /* print out the contents of the index */
-  private def dumpIndex(file: File, verifyOnly: Boolean, misMatchesForIndexFilesMap: mutable.HashMap[String, List[(Long, Long)]]) {
+  private def dumpIndex(file: File,
+                        verifyOnly: Boolean,
+                        misMatchesForIndexFilesMap: mutable.HashMap[String, List[(Long, Long)]],
+                        maxMessageSize: Int) {
     val startOffset = file.getName().split("\\.")(0).toLong
     val logFileName = file.getAbsolutePath.split("\\.")(0) + Log.LogFileSuffix
     val logFile = new File(logFileName)
@@ -87,8 +98,8 @@ object DumpLogSegments {
     val index = new OffsetIndex(file = file, baseOffset = startOffset)
     for(i <- 0 until index.entries) {
       val entry = index.entry(i)
-      val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, messageSet.sizeInBytes())
-      val messageAndOffset = partialFileMessageSet.head
+      val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, maxMessageSize)
+      val messageAndOffset = getIterator(partialFileMessageSet.head, isDeepIteration = true).next()
       if(messageAndOffset.offset != entry.offset + index.baseOffset) {
         var misMatchesSeq = misMatchesForIndexFilesMap.getOrElse(file.getName, List[(Long, Long)]())
         misMatchesSeq ::=(entry.offset + index.baseOffset, messageAndOffset.offset)
@@ -103,40 +114,73 @@ object DumpLogSegments {
   }
   
   /* print out the contents of the log */
-  private def dumpLog(file: File, printContents: Boolean, nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Long, Long)]]) {
+  private def dumpLog(file: File,
+                      printContents: Boolean,
+                      nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Long, Long)]],
+                      isDeepIteration: Boolean) {
     val startOffset = file.getName().split("\\.")(0).toLong
     println("Starting offset: " + startOffset)
     val messageSet = new FileMessageSet(file)
     var validBytes = 0L
     var lastOffset = -1l
-    for(messageAndOffset <- messageSet) {
-      val msg = messageAndOffset.message
+    for(shallowMessageAndOffset <- messageSet) { // this only does shallow iteration
+      val itr = getIterator(shallowMessageAndOffset, isDeepIteration)
+      for (messageAndOffset <- itr) {
+        val msg = messageAndOffset.message
 
-      if(lastOffset == -1)
+        if(lastOffset == -1)
+          lastOffset = messageAndOffset.offset
+        // If we are iterating uncompressed messages, offsets must be consecutive
+        else if (msg.compressionCodec == NoCompressionCodec && messageAndOffset.offset != lastOffset +1) {
+          var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, List[(Long, Long)]())
+          nonConsecutivePairsSeq ::=(lastOffset, messageAndOffset.offset)
+          nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq)
+        }
         lastOffset = messageAndOffset.offset
-      // If it's uncompressed message, its offset must be lastOffset + 1 no matter last message is compressed or uncompressed
-      else if (msg.compressionCodec == NoCompressionCodec && messageAndOffset.offset != lastOffset +1) {
-        var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, List[(Long, Long)]())
-        nonConsecutivePairsSeq ::=(lastOffset, messageAndOffset.offset)
-        nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq)
-      }
-      lastOffset = messageAndOffset.offset
 
-      print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid +
-            " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
-            " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
-      validBytes += MessageSet.entrySize(msg)
-      if(msg.hasKey)
-        print(" keysize: " + msg.keySize)
-      if(printContents) {
+        print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid +
+              " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
+              " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
         if(msg.hasKey)
-          print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
-        print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
+          print(" keysize: " + msg.keySize)
+        if(printContents) {
+          if(msg.hasKey)
+            print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
+          print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
+        }
+        println()
       }
-      println()
+      validBytes += MessageSet.entrySize(shallowMessageAndOffset.message)
     }
     val trailingBytes = messageSet.sizeInBytes - validBytes
     if(trailingBytes > 0)
       println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName))
   }
+
+  private def getIterator(messageAndOffset: MessageAndOffset, isDeepIteration: Boolean) = {
+    if (isDeepIteration) {
+      val message = messageAndOffset.message
+      message.compressionCodec match {
+        case NoCompressionCodec =>
+          getSingleMessageIterator(messageAndOffset)
+        case _ =>
+          ByteBufferMessageSet.decompress(message).iterator
+      }
+    } else
+      getSingleMessageIterator(messageAndOffset)
+  }
+
+  private def getSingleMessageIterator(messageAndOffset: MessageAndOffset) = {
+    new IteratorTemplate[MessageAndOffset] {
+      var messageIterated = false
+
+      override def makeNext(): MessageAndOffset = {
+        if (!messageIterated) {
+          messageIterated = true
+          messageAndOffset
+        } else
+          allDone()
+      }
+    }
+  }
 }