You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/07/24 02:08:58 UTC
git commit: kafka-1192;
Enable DumpLogSegments tool to deserialize messages;
patched by Manikumar Reddy; reviewed by Guozhang Wang and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk ff05e9b36 -> 3f1a9c4ce
kafka-1192; Enable DumpLogSegments tool to deserialize messages; patched by Manikumar Reddy; reviewed by Guozhang Wang and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f1a9c4c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f1a9c4c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f1a9c4c
Branch: refs/heads/trunk
Commit: 3f1a9c4cee778d089d3ec3167555c2b89cdc48bb
Parents: ff05e9b
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Wed Jul 23 17:08:51 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jul 23 17:08:51 2014 -0700
----------------------------------------------------------------------
.../scala/kafka/tools/DumpLogSegments.scala | 27 +++++++++++++++-----
1 file changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f1a9c4c/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 6daf87b..8e9d47b 100644
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -23,7 +23,8 @@ import kafka.log._
import kafka.utils._
import collection.mutable
import joptsimple.OptionParser
-
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
object DumpLogSegments {
@@ -41,7 +42,15 @@ object DumpLogSegments {
.ofType(classOf[java.lang.Integer])
.defaultsTo(5 * 1024 * 1024)
val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration")
-
+ val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
+ .withOptionalArg()
+ .ofType(classOf[java.lang.String])
+ .defaultsTo("kafka.serializer.StringDecoder")
+ val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
+ .withOptionalArg()
+ .ofType(classOf[java.lang.String])
+ .defaultsTo("kafka.serializer.StringDecoder")
+
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.")
@@ -54,6 +63,9 @@ object DumpLogSegments {
val files = options.valueOf(filesOpt).split(",")
val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
val isDeepIteration = if(options.has(deepIterationOpt)) true else false
+
+ val valueDecoder: Decoder[_] = Utils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new VerifiableProperties)
+ val keyDecoder: Decoder[_] = Utils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new VerifiableProperties)
val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
@@ -62,7 +74,7 @@ object DumpLogSegments {
val file = new File(arg)
if(file.getName.endsWith(Log.LogFileSuffix)) {
println("Dumping " + file)
- dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize)
+ dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize , valueDecoder, keyDecoder)
} else if(file.getName.endsWith(Log.IndexFileSuffix)) {
println("Dumping " + file)
dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
@@ -118,7 +130,9 @@ object DumpLogSegments {
printContents: Boolean,
nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Long, Long)]],
isDeepIteration: Boolean,
- maxMessageSize: Int) {
+ maxMessageSize: Int,
+ valueDecoder: Decoder[_],
+ keyDecoder: Decoder[_]) {
val startOffset = file.getName().split("\\.")(0).toLong
println("Starting offset: " + startOffset)
val messageSet = new FileMessageSet(file, false)
@@ -147,8 +161,8 @@ object DumpLogSegments {
print(" keysize: " + msg.keySize)
if(printContents) {
if(msg.hasKey)
- print(" key: " + Utils.readString(messageAndOffset.message.key, "UTF-8"))
- val payload = if(messageAndOffset.message.isNull) null else Utils.readString(messageAndOffset.message.payload, "UTF-8")
+ print(" key: " + keyDecoder.fromBytes(Utils.readBytes(messageAndOffset.message.key)))
+ val payload = if(messageAndOffset.message.isNull) null else valueDecoder.fromBytes(Utils.readBytes(messageAndOffset.message.payload))
print(" payload: " + payload)
}
println()
@@ -186,4 +200,5 @@ object DumpLogSegments {
}
}
}
+
}