You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2016/02/29 18:14:58 UTC
kafka git commit: KAFKA-3291; DumpLogSegment tool should also provide an option to only…
Repository: kafka
Updated Branches:
refs/heads/trunk d4e60b9f5 -> 92c35230f
KAFKA-3291; DumpLogSegment tool should also provide an option to only…
… verify index sanity.
Author: Parth Brahmbhatt <br...@gmail.com>
Reviewers: Gwen Shapira <cs...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Grant Henke <gr...@gmail.com>, Sriharsha Chintalapani <ma...@harsha.io>
Closes #975 from Parth-Brahmbhatt/KAFKA-3291
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/92c35230
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/92c35230
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/92c35230
Branch: refs/heads/trunk
Commit: 92c35230f463e570de17239d8e76c7be5350c7d8
Parents: d4e60b9
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Feb 29 09:14:38 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Feb 29 09:14:38 2016 -0800
----------------------------------------------------------------------
.../src/main/scala/kafka/tools/DumpLogSegments.scala | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/92c35230/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 3c41c7c..afba1ad 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -38,6 +38,8 @@ object DumpLogSegments {
val parser = new OptionParser
val printOpt = parser.accepts("print-data-log", "if set, printing the messages content when dumping data logs")
val verifyOpt = parser.accepts("verify-index-only", "if set, just verify the index log without printing its content")
+ val indexSanityOpt = parser.accepts("index-sanity-check", "if set, just checks the index sanity without printing its content. " +
+ "This is the same check that is executed on broker startup to determine if an index needs rebuilding or not.")
val filesOpt = parser.accepts("files", "REQUIRED: The comma separated list of data and index log files to be dumped")
.withRequiredArg
.describedAs("file1, file2, ...")
@@ -68,6 +70,8 @@ object DumpLogSegments {
val print = if(options.has(printOpt)) true else false
val verifyOnly = if(options.has(verifyOpt)) true else false
+ val indexSanityOnly = if(options.has(indexSanityOpt)) true else false
+
val files = options.valueOf(filesOpt).split(",")
val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
val isDeepIteration = if(options.has(deepIterationOpt)) true else false
@@ -90,7 +94,7 @@ object DumpLogSegments {
dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize , messageParser)
} else if(file.getName.endsWith(Log.IndexFileSuffix)) {
println("Dumping " + file)
- dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
+ dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
}
}
misMatchesForIndexFilesMap.foreach {
@@ -113,6 +117,7 @@ object DumpLogSegments {
/* print out the contents of the index */
private def dumpIndex(file: File,
+ indexSanityOnly: Boolean,
verifyOnly: Boolean,
misMatchesForIndexFilesMap: mutable.HashMap[String, List[(Long, Long)]],
maxMessageSize: Int) {
@@ -120,6 +125,14 @@ object DumpLogSegments {
val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix)
val messageSet = new FileMessageSet(logFile, false)
val index = new OffsetIndex(file = file, baseOffset = startOffset)
+
+ //Check that index passes sanityCheck, this is the check that determines if indexes will be rebuilt on startup or not.
+ if (indexSanityOnly) {
+ index.sanityCheck
+ println(s"$file passed sanity check.")
+ return
+ }
+
for(i <- 0 until index.entries) {
val entry = index.entry(i)
val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, maxMessageSize)