You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/12/04 17:04:55 UTC

[kafka] branch trunk updated: KAFKA-7687; Print batch level information in DumpLogSegments when deep iterating (#5976)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f65b1c4  KAFKA-7687; Print batch level information in DumpLogSegments when deep iterating (#5976)
f65b1c4 is described below

commit f65b1c4796fdf34a883e0228f7b74a0fe36dd4a8
Author: huxi <hu...@hotmail.com>
AuthorDate: Wed Dec 5 01:04:39 2018 +0800

    KAFKA-7687; Print batch level information in DumpLogSegments when deep iterating (#5976)
    
    DumpLogSegments should print batch level information when deep-iteration is specified.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/tools/DumpLogSegments.scala   | 45 +++++++++++-----------
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     | 17 +++++---
 2 files changed, 34 insertions(+), 28 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 281e920..64eda92 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -36,6 +36,9 @@ import scala.collection.JavaConverters._
 
 object DumpLogSegments {
 
+  // visible for testing
+  private[tools] val RECORD_INDENT = "|"
+
   def main(args: Array[String]) {
     val opts = new DumpLogSegmentsOptions(args)
     CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.")
@@ -277,6 +280,7 @@ object DumpLogSegments {
     var lastOffset = -1L
 
     for (batch <- messageSet.batches.asScala) {
+      printBatchLevel(batch, validBytes)
       if (isDeepIteration) {
         for (record <- batch.asScala) {
           if (lastOffset == -1)
@@ -288,17 +292,13 @@ object DumpLogSegments {
           }
           lastOffset = record.offset
 
-          print("offset: " + record.offset + " position: " + validBytes +
-            " " + batch.timestampType + ": " + record.timestamp + " isvalid: " + record.isValid +
-            " keysize: " + record.keySize + " valuesize: " + record.valueSize + " magic: " + batch.magic +
-            " compresscodec: " + batch.compressionType)
+          print(s"$RECORD_INDENT offset: ${record.offset} ${batch.timestampType}: ${record.timestamp} " +
+            s"keysize: ${record.keySize} valuesize: ${record.valueSize}")
 
           if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
-            print(" producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch + " sequence: " + record.sequence +
-              " isTransactional: " + batch.isTransactional +
-              " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
+            print(" sequence: " + record.sequence + " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
           } else {
-            print(" crc: " + record.checksumOrNull)
+            print(s" crc: ${record.checksumOrNull} isvalid: ${record.isValid}")
           }
 
           if (batch.isControlBatch) {
@@ -317,20 +317,6 @@ object DumpLogSegments {
           }
           println()
         }
-      } else {
-        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
-          print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset + " count: " + batch.countOrNull +
-            " baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence +
-            " producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch +
-            " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional +
-            " isControl: " + batch.isControlBatch)
-        else
-          print("offset: " + batch.lastOffset)
-
-        println(" position: " + validBytes + " " + batch.timestampType + ": " + batch.maxTimestamp +
-          " isvalid: " + batch.isValid +
-          " size: " + batch.sizeInBytes + " magic: " + batch.magic +
-          " compresscodec: " + batch.compressionType + " crc: " + batch.checksum)
       }
       validBytes += batch.sizeInBytes
     }
@@ -339,6 +325,21 @@ object DumpLogSegments {
       println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName))
   }
 
+  private def printBatchLevel(batch: FileLogInputStream.FileChannelRecordBatch, accumulativeBytes: Long): Unit = {
+    if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+      print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset + " count: " + batch.countOrNull +
+        " baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence +
+        " producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch +
+        " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional +
+        " isControl: " + batch.isControlBatch)
+    else
+      print("offset: " + batch.lastOffset)
+
+    println(" position: " + accumulativeBytes + " " + batch.timestampType + ": " + batch.maxTimestamp +
+      " size: " + batch.sizeInBytes + " magic: " + batch.magic +
+      " compresscodec: " + batch.compressionType + " crc: " + batch.checksum + " isvalid: " + batch.isValid)
+  }
+
   class TimeIndexDumpErrors {
     val misMatchesForTimeIndexFilesMap = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
     val outOfOrderTimestamp = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 01f0010..2ee90ad 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -41,7 +41,9 @@ class DumpLogSegmentsTest {
       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10))
 
-    /* append two messages */
+    /* append four messages */
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0,
+      new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0,
       new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
     log.flush()
@@ -59,16 +61,19 @@ class DumpLogSegmentsTest {
       val output = runDumpLogSegments(args)
       val lines = output.split("\n")
       assertTrue(s"Data not printed: $output", lines.length > 2)
-      // Verify that the last two lines are message records
-      (0 until 2).foreach { i =>
-        val line = lines(lines.length - 2 + i)
-        assertTrue(s"Not a valid message record: $line", line.startsWith(s"offset: $i position:"))
+      // For every three message records, verify that the first line is batch-level message record and the last two lines are message records
+      (0 until 6 ).foreach { i =>
+        val line = lines(lines.length - 6 + i)
+        if (i % 3 == 0)
+          assertTrue(s"Not a valid batch-level message record: $line", line.startsWith(s"baseOffset: ${i / 3 * 2} lastOffset: "))
+        else
+          assertTrue(s"Not a valid message record: $line", line.startsWith(s"${DumpLogSegments.RECORD_INDENT} offset: ${i - 1 - i / 3}"))
       }
     }
 
     def verifyNoRecordsInOutput(args: Array[String]): Unit = {
       val output = runDumpLogSegments(args)
-      assertFalse(s"Data should not have been printed: $output", output.matches("(?s).*offset: [0-9]* position.*"))
+      assertFalse(s"Data should not have been printed: $output", output.matches("(?s).*offset: [0-9]* isvalid.*"))
     }
 
     // Verify that records are printed with --print-data-log even if --deep-iteration is not specified