You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/04/26 20:46:41 UTC
kafka git commit: KAFKA-5108;
Add support for reading PID snapshot files to DumpLogSegments
Repository: kafka
Updated Branches:
refs/heads/trunk d06f2cc7a -> 8d8ab2ebc
KAFKA-5108; Add support for reading PID snapshot files to DumpLogSegments
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2922 from hachikuji/KAFKA-5108
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d8ab2eb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d8ab2eb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d8ab2eb
Branch: refs/heads/trunk
Commit: 8d8ab2ebcd498982a9dd6be873f4d7cc032c65d8
Parents: d06f2cc
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Apr 26 13:46:35 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Apr 26 13:46:35 2017 -0700
----------------------------------------------------------------------
.../scala/kafka/log/ProducerIdMapping.scala | 6 ++--
.../scala/kafka/tools/DumpLogSegments.scala | 38 ++++++++++++++------
2 files changed, 31 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8ab2eb/core/src/main/scala/kafka/log/ProducerIdMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerIdMapping.scala b/core/src/main/scala/kafka/log/ProducerIdMapping.scala
index fc6ff0b..bcadce5 100644
--- a/core/src/main/scala/kafka/log/ProducerIdMapping.scala
+++ b/core/src/main/scala/kafka/log/ProducerIdMapping.scala
@@ -95,7 +95,7 @@ private[log] class ProducerAppendInfo(val pid: Long, initialEntry: ProducerIdEnt
ProducerIdEntry(epoch, lastSeq, lastOffset, lastSeq - firstSeq, maxTimestamp)
}
-private[log] class CorruptSnapshotException(msg: String) extends KafkaException(msg)
+class CorruptSnapshotException(msg: String) extends KafkaException(msg)
object ProducerIdMapping {
private val PidSnapshotVersion: Short = 1
@@ -127,7 +127,7 @@ object ProducerIdMapping {
new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"),
new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID table"))
- private def readSnapshot(file: File): Iterable[(Long, ProducerIdEntry)] = {
+ def readSnapshot(file: File): Iterable[(Long, ProducerIdEntry)] = {
val buffer = Files.readAllBytes(file.toPath)
val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
@@ -138,7 +138,7 @@ object ProducerIdMapping {
val crc = struct.getUnsignedInt(CrcField)
val computedCrc = Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset)
if (crc != computedCrc)
- throw new CorruptSnapshotException(s"Snapshot file is corrupted (CRC is no longer valid). Stored crc: ${crc}. Computed crc: ${computedCrc}")
+ throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). Stored crc: $crc. Computed crc: $computedCrc")
struct.getArray(PidEntriesField).map { pidEntryObj =>
val pidEntryStruct = pidEntryObj.asInstanceOf[Struct]
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8ab2eb/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 038c15b..bea6e9e 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -92,15 +92,21 @@ object DumpLogSegments {
for(arg <- files) {
val file = new File(arg)
- if(file.getName.endsWith(Log.LogFileSuffix)) {
- println("Dumping " + file)
- dumpLog(file, printDataLog, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize , messageParser)
- } else if(file.getName.endsWith(Log.IndexFileSuffix)) {
- println("Dumping " + file)
- dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
- } else if(file.getName.endsWith(Log.TimeIndexFileSuffix)) {
- println("Dumping " + file)
- dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize)
+ println(s"Dumping $file")
+
+ val filename = file.getName
+ val suffix = filename.substring(filename.lastIndexOf("."))
+ suffix match {
+ case Log.LogFileSuffix =>
+ dumpLog(file, printDataLog, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize , messageParser)
+ case Log.IndexFileSuffix =>
+ dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
+ case Log.TimeIndexFileSuffix =>
+ dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize)
+ case Log.PidSnapshotFileSuffix =>
+ dumpPidSnapshot(file)
+ case _ =>
+ System.err.println(s"Ignoring unknown file $file")
}
}
@@ -117,7 +123,7 @@ object DumpLogSegments {
nonConsecutivePairsForLogFilesMap.foreach {
case (fileName, listOfNonConsecutivePairs) => {
- System.err.println("Non-secutive offsets in :" + fileName)
+ System.err.println("Non-consecutive offsets in :" + fileName)
listOfNonConsecutivePairs.foreach(m => {
System.err.println(" %d is followed by %d".format(m._1, m._2))
})
@@ -125,6 +131,18 @@ object DumpLogSegments {
}
}
+ private def dumpPidSnapshot(file: File): Unit = {
+ try {
+ ProducerIdMapping.readSnapshot(file).foreach { case (pid, entry) =>
+ println(s"pid: $pid epoch: ${entry.epoch} lastSequence: ${entry.lastSeq} lastOffset: ${entry.lastOffset} " +
+ s"offsetDelta: ${entry.offsetDelta} lastTimestamp: ${entry.timestamp}")
+ }
+ } catch {
+ case e: CorruptSnapshotException =>
+ System.err.println(e.getMessage)
+ }
+ }
+
/* print out the contents of the index */
private def dumpIndex(file: File,
indexSanityOnly: Boolean,