You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/04/26 20:46:41 UTC

kafka git commit: KAFKA-5108; Add support for reading PID snapshot files to DumpLogSegments

Repository: kafka
Updated Branches:
  refs/heads/trunk d06f2cc7a -> 8d8ab2ebc


KAFKA-5108; Add support for reading PID snapshot files to DumpLogSegments

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2922 from hachikuji/KAFKA-5108


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d8ab2eb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d8ab2eb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d8ab2eb

Branch: refs/heads/trunk
Commit: 8d8ab2ebcd498982a9dd6be873f4d7cc032c65d8
Parents: d06f2cc
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Apr 26 13:46:35 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Apr 26 13:46:35 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/log/ProducerIdMapping.scala     |  6 ++--
 .../scala/kafka/tools/DumpLogSegments.scala     | 38 ++++++++++++++------
 2 files changed, 31 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8ab2eb/core/src/main/scala/kafka/log/ProducerIdMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerIdMapping.scala b/core/src/main/scala/kafka/log/ProducerIdMapping.scala
index fc6ff0b..bcadce5 100644
--- a/core/src/main/scala/kafka/log/ProducerIdMapping.scala
+++ b/core/src/main/scala/kafka/log/ProducerIdMapping.scala
@@ -95,7 +95,7 @@ private[log] class ProducerAppendInfo(val pid: Long, initialEntry: ProducerIdEnt
     ProducerIdEntry(epoch, lastSeq, lastOffset, lastSeq - firstSeq, maxTimestamp)
 }
 
-private[log] class CorruptSnapshotException(msg: String) extends KafkaException(msg)
+class CorruptSnapshotException(msg: String) extends KafkaException(msg)
 
 object ProducerIdMapping {
   private val PidSnapshotVersion: Short = 1
@@ -127,7 +127,7 @@ object ProducerIdMapping {
     new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"),
     new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID table"))
 
-  private def readSnapshot(file: File): Iterable[(Long, ProducerIdEntry)] = {
+  def readSnapshot(file: File): Iterable[(Long, ProducerIdEntry)] = {
     val buffer = Files.readAllBytes(file.toPath)
     val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
 
@@ -138,7 +138,7 @@ object ProducerIdMapping {
     val crc = struct.getUnsignedInt(CrcField)
     val computedCrc =  Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset)
     if (crc != computedCrc)
-      throw new CorruptSnapshotException(s"Snapshot file is corrupted (CRC is no longer valid). Stored crc: ${crc}. Computed crc: ${computedCrc}")
+      throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). Stored crc: $crc. Computed crc: $computedCrc")
 
     struct.getArray(PidEntriesField).map { pidEntryObj =>
       val pidEntryStruct = pidEntryObj.asInstanceOf[Struct]

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d8ab2eb/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 038c15b..bea6e9e 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -92,15 +92,21 @@ object DumpLogSegments {
 
     for(arg <- files) {
       val file = new File(arg)
-      if(file.getName.endsWith(Log.LogFileSuffix)) {
-        println("Dumping " + file)
-        dumpLog(file, printDataLog, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize , messageParser)
-      } else if(file.getName.endsWith(Log.IndexFileSuffix)) {
-        println("Dumping " + file)
-        dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
-      } else if(file.getName.endsWith(Log.TimeIndexFileSuffix)) {
-        println("Dumping " + file)
-        dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize)
+      println(s"Dumping $file")
+
+      val filename = file.getName
+      val suffix = filename.substring(filename.lastIndexOf("."))
+      suffix match {
+        case Log.LogFileSuffix =>
+          dumpLog(file, printDataLog, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize , messageParser)
+        case Log.IndexFileSuffix =>
+          dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
+        case Log.TimeIndexFileSuffix =>
+          dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize)
+        case Log.PidSnapshotFileSuffix =>
+          dumpPidSnapshot(file)
+        case _ =>
+          System.err.println(s"Ignoring unknown file $file")
       }
     }
 
@@ -117,7 +123,7 @@ object DumpLogSegments {
 
     nonConsecutivePairsForLogFilesMap.foreach {
       case (fileName, listOfNonConsecutivePairs) => {
-        System.err.println("Non-secutive offsets in :" + fileName)
+        System.err.println("Non-consecutive offsets in :" + fileName)
         listOfNonConsecutivePairs.foreach(m => {
           System.err.println("  %d is followed by %d".format(m._1, m._2))
         })
@@ -125,6 +131,18 @@ object DumpLogSegments {
     }
   }
 
+  private def dumpPidSnapshot(file: File): Unit = {
+    try {
+      ProducerIdMapping.readSnapshot(file).foreach { case (pid, entry) =>
+        println(s"pid: $pid epoch: ${entry.epoch} lastSequence: ${entry.lastSeq} lastOffset: ${entry.lastOffset} " +
+          s"offsetDelta: ${entry.offsetDelta} lastTimestamp: ${entry.timestamp}")
+      }
+    } catch {
+      case e: CorruptSnapshotException =>
+        System.err.println(e.getMessage)
+    }
+  }
+
   /* print out the contents of the index */
   private def dumpIndex(file: File,
                         indexSanityOnly: Boolean,