You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/07/24 02:08:58 UTC

git commit: kafka-1192; Enable DumpLogSegments tool to deserialize messages; patched by Manikumar Reddy; reviewed by Guozhang Wang and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk ff05e9b36 -> 3f1a9c4ce


kafka-1192; Enable DumpLogSegments tool to deserialize messages; patched by Manikumar Reddy; reviewed by Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f1a9c4c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f1a9c4c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f1a9c4c

Branch: refs/heads/trunk
Commit: 3f1a9c4cee778d089d3ec3167555c2b89cdc48bb
Parents: ff05e9b
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Wed Jul 23 17:08:51 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jul 23 17:08:51 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/DumpLogSegments.scala     | 27 +++++++++++++++-----
 1 file changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3f1a9c4c/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 6daf87b..8e9d47b 100644
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -23,7 +23,8 @@ import kafka.log._
 import kafka.utils._
 import collection.mutable
 import joptsimple.OptionParser
-
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
 
 object DumpLogSegments {
 
@@ -41,7 +42,15 @@ object DumpLogSegments {
                                   .ofType(classOf[java.lang.Integer])
                                   .defaultsTo(5 * 1024 * 1024)
     val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration")
-    
+    val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
+                               .withOptionalArg()
+                               .ofType(classOf[java.lang.String])
+                               .defaultsTo("kafka.serializer.StringDecoder")
+    val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
+                               .withOptionalArg()
+                               .ofType(classOf[java.lang.String])
+                               .defaultsTo("kafka.serializer.StringDecoder")
+
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.")
 
@@ -54,6 +63,9 @@ object DumpLogSegments {
     val files = options.valueOf(filesOpt).split(",")
     val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
     val isDeepIteration = if(options.has(deepIterationOpt)) true else false
+  
+    val valueDecoder: Decoder[_] = Utils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new VerifiableProperties)
+    val keyDecoder: Decoder[_] = Utils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new VerifiableProperties)
 
     val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
     val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
@@ -62,7 +74,7 @@ object DumpLogSegments {
       val file = new File(arg)
       if(file.getName.endsWith(Log.LogFileSuffix)) {
         println("Dumping " + file)
-        dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize)
+        dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize , valueDecoder, keyDecoder)
       } else if(file.getName.endsWith(Log.IndexFileSuffix)) {
         println("Dumping " + file)
         dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
@@ -118,7 +130,9 @@ object DumpLogSegments {
                       printContents: Boolean,
                       nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Long, Long)]],
                       isDeepIteration: Boolean,
-                      maxMessageSize: Int) {
+                      maxMessageSize: Int,
+                      valueDecoder: Decoder[_],
+                      keyDecoder: Decoder[_]) {
     val startOffset = file.getName().split("\\.")(0).toLong
     println("Starting offset: " + startOffset)
     val messageSet = new FileMessageSet(file, false)
@@ -147,8 +161,8 @@ object DumpLogSegments {
           print(" keysize: " + msg.keySize)
         if(printContents) {
           if(msg.hasKey)
-            print(" key: " + Utils.readString(messageAndOffset.message.key, "UTF-8"))
-          val payload = if(messageAndOffset.message.isNull) null else Utils.readString(messageAndOffset.message.payload, "UTF-8")
+            print(" key: " + keyDecoder.fromBytes(Utils.readBytes(messageAndOffset.message.key)))
+          val payload = if(messageAndOffset.message.isNull) null else valueDecoder.fromBytes(Utils.readBytes(messageAndOffset.message.payload))
           print(" payload: " + payload)
         }
         println()
@@ -186,4 +200,5 @@ object DumpLogSegments {
       }
     }
   }
+  
 }