You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/26 19:32:08 UTC

[kafka] branch 2.8 updated: MINOR: Add cluster-metadata-decoder to DumpLogSegments (#10212)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new f271932  MINOR: Add cluster-metadata-decoder to DumpLogSegments (#10212)
f271932 is described below

commit f27193200080dc73a9459548594a2bd12719ad4b
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Fri Feb 26 11:28:11 2021 -0800

    MINOR: Add cluster-metadata-decoder to DumpLogSegments (#10212)
    
    Add the --cluster-metadata-decoder and --skip-record-metadata options to
    the DumpLogSegments command-line tool, as described in KIP-631.
    
    Co-authored-by: David Arthur <mu...@gmail.com>
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/tools/DumpLogSegments.scala   | 90 ++++++++++++++++------
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     | 68 +++++++++++++++-
 2 files changed, 132 insertions(+), 26 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 8263176..8bc07da 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -19,14 +19,18 @@ package kafka.tools
 
 import java.io._
 
+import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode}
 import kafka.coordinator.group.GroupMetadataManager
 import kafka.coordinator.transaction.TransactionLog
 import kafka.log._
 import kafka.serializer.Decoder
 import kafka.utils._
 import kafka.utils.Implicits._
+import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordType}
+import org.apache.kafka.common.protocol.ByteBufferAccessor
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.raft.metadata.MetadataRecordSerde
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
@@ -55,7 +59,7 @@ object DumpLogSegments {
       suffix match {
         case Log.LogFileSuffix =>
           dumpLog(file, opts.shouldPrintDataLog, nonConsecutivePairsForLogFilesMap, opts.isDeepIteration,
-            opts.maxMessageSize, opts.messageParser)
+            opts.maxMessageSize, opts.messageParser, opts.skipRecordMetadata)
         case Log.IndexFileSuffix =>
           dumpIndex(file, opts.indexSanityOnly, opts.verifyOnly, misMatchesForIndexFilesMap, opts.maxMessageSize)
         case Log.TimeIndexFileSuffix =>
@@ -242,7 +246,8 @@ object DumpLogSegments {
                       nonConsecutivePairsForLogFilesMap: mutable.Map[String, List[(Long, Long)]],
                       isDeepIteration: Boolean,
                       maxMessageSize: Int,
-                      parser: MessageParser[_, _]): Unit = {
+                      parser: MessageParser[_, _],
+                      skipRecordMetadata: Boolean): Unit = {
     val startOffset = file.getName.split("\\.")(0).toLong
     println("Starting offset: " + startOffset)
     val fileRecords = FileRecords.open(file, false)
@@ -263,31 +268,38 @@ object DumpLogSegments {
             }
             lastOffset = record.offset
 
-            print(s"$RecordIndent offset: ${record.offset} isValid: ${record.isValid} crc: ${record.checksumOrNull}" +
-                s" keySize: ${record.keySize} valueSize: ${record.valueSize} ${batch.timestampType}: ${record.timestamp}" +
-                s" baseOffset: ${batch.baseOffset} lastOffset: ${batch.lastOffset} baseSequence: ${batch.baseSequence}" +
-                s" lastSequence: ${batch.lastSequence} producerEpoch: ${batch.producerEpoch} partitionLeaderEpoch: ${batch.partitionLeaderEpoch}" +
-                s" batchSize: ${batch.sizeInBytes} magic: ${batch.magic} compressType: ${batch.compressionType} position: ${validBytes}")
-
-
-            if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
-              print(" sequence: " + record.sequence + " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
-            } else {
-              print(s" crc: ${record.checksumOrNull} isvalid: ${record.isValid}")
-            }
+            var prefix = s"${RecordIndent} "
+            if (!skipRecordMetadata) {
+              print(s"${prefix}offset: ${record.offset} isValid: ${record.isValid} crc: ${record.checksumOrNull}" +
+                  s" keySize: ${record.keySize} valueSize: ${record.valueSize} ${batch.timestampType}: ${record.timestamp}" +
+                  s" baseOffset: ${batch.baseOffset} lastOffset: ${batch.lastOffset} baseSequence: ${batch.baseSequence}" +
+                  s" lastSequence: ${batch.lastSequence} producerEpoch: ${batch.producerEpoch} partitionLeaderEpoch: ${batch.partitionLeaderEpoch}" +
+                  s" batchSize: ${batch.sizeInBytes} magic: ${batch.magic} compressType: ${batch.compressionType} position: ${validBytes}")
+              prefix = " "
+
+              if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
+                print(" sequence: " + record.sequence + " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
+              } else {
+                print(s" crc: ${record.checksumOrNull} isvalid: ${record.isValid}")
+              }
 
-            if (batch.isControlBatch) {
-              val controlTypeId = ControlRecordType.parseTypeId(record.key)
-              ControlRecordType.fromTypeId(controlTypeId) match {
-                case ControlRecordType.ABORT | ControlRecordType.COMMIT =>
-                  val endTxnMarker = EndTransactionMarker.deserialize(record)
-                  print(s" endTxnMarker: ${endTxnMarker.controlType} coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
-                case controlType =>
-                  print(s" controlType: $controlType($controlTypeId)")
+              if (batch.isControlBatch) {
+                val controlTypeId = ControlRecordType.parseTypeId(record.key)
+                ControlRecordType.fromTypeId(controlTypeId) match {
+                  case ControlRecordType.ABORT | ControlRecordType.COMMIT =>
+                    val endTxnMarker = EndTransactionMarker.deserialize(record)
+                    print(s" endTxnMarker: ${endTxnMarker.controlType} coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
+                  case controlType =>
+                    print(s" controlType: $controlType($controlTypeId)")
+                }
               }
-            } else if (printContents) {
+            }
+            if (printContents && !batch.isControlBatch) {
               val (key, payload) = parser.parse(record)
-              key.foreach(key => print(s" key: $key"))
+              key.foreach { key =>
+                print(s"${prefix}key: $key")
+                prefix = " "
+              }
               payload.foreach(payload => print(s" payload: $payload"))
             }
             println()
@@ -382,6 +394,30 @@ object DumpLogSegments {
     }
   }
 
+  private class ClusterMetadataLogMessageParser extends MessageParser[String, String] {
+    val metadataRecordSerde = new MetadataRecordSerde()
+
+    override def parse(record: Record): (Option[String], Option[String]) = {
+      val output = try {
+        val messageAndVersion = metadataRecordSerde.
+          read(new ByteBufferAccessor(record.value), record.valueSize())
+        val json = new ObjectNode(JsonNodeFactory.instance)
+        json.set("type", new TextNode(MetadataRecordType.fromId(
+          messageAndVersion.message().apiKey()).toString))
+        json.set("version", new IntNode(messageAndVersion.version()))
+        json.set("data", MetadataJsonConverters.writeJson(
+          messageAndVersion.message(), messageAndVersion.version()))
+        json.toString()
+      } catch {
+        case e: Throwable => {
+          s"Error at ${record.offset}, skipping. ${e.getMessage}"
+        }
+      }
+      // No keys for metadata records
+      (None, Some(output))
+    }
+  }
+
   private class DumpLogSegmentsOptions(args: Array[String]) extends CommandDefaultOptions(args) {
     val printOpt = parser.accepts("print-data-log", "if set, printing the messages content when dumping data logs. Automatically set if any decoder option is specified.")
     val verifyOpt = parser.accepts("verify-index-only", "if set, just verify the index log without printing its content.")
@@ -409,6 +445,8 @@ object DumpLogSegments {
       "__consumer_offsets topic.")
     val transactionLogOpt = parser.accepts("transaction-log-decoder", "if set, log data will be parsed as " +
       "transaction metadata from the __transaction_state topic.")
+    val clusterMetadataOpt = parser.accepts("cluster-metadata-decoder", "if set, log data will be parsed as cluster metadata records.")
+    val skipRecordMetadataOpt = parser.accepts("skip-record-metadata", "whether to skip printing metadata for each record.")
     options = parser.parse(args : _*)
 
     def messageParser: MessageParser[_, _] =
@@ -416,6 +454,8 @@ object DumpLogSegments {
         new OffsetsMessageParser
       } else if (options.has(transactionLogOpt)) {
         new TransactionLogMessageParser
+      } else if (options.has(clusterMetadataOpt)) {
+        new ClusterMetadataLogMessageParser
       } else {
         val valueDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new VerifiableProperties)
         val keyDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new VerifiableProperties)
@@ -425,9 +465,11 @@ object DumpLogSegments {
     lazy val shouldPrintDataLog: Boolean = options.has(printOpt) ||
       options.has(offsetsOpt) ||
       options.has(transactionLogOpt) ||
+      options.has(clusterMetadataOpt) ||
       options.has(valueDecoderOpt) ||
       options.has(keyDecoderOpt)
 
+    lazy val skipRecordMetadata = options.has(skipRecordMetadataOpt)
     lazy val isDeepIteration: Boolean = options.has(deepIterationOpt) || shouldPrintDataLog
     lazy val verifyOnly: Boolean = options.has(verifyOpt)
     lazy val indexSanityOnly: Boolean = options.has(indexSanityOpt)
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 7714c7f..1a5e51e 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -18,14 +18,21 @@
 package kafka.tools
 
 import java.io.{ByteArrayOutputStream, File, PrintWriter}
+import java.nio.ByteBuffer
+import java.util
 import java.util.Properties
 
-import kafka.log.{Log, LogConfig, LogManager}
+import kafka.log.{Log, LogConfig, LogManager, LogTest}
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
 import kafka.utils.{MockTime, TestUtils}
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord}
+import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.metadata.ApiMessageAndVersion
+import org.apache.kafka.raft.metadata.MetadataRecordSerde
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
@@ -55,7 +62,9 @@ class DumpLogSegmentsTest {
       time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10))
+  }
 
+  def addSimpleRecords(): Unit = {
     val now = System.currentTimeMillis()
     val firstBatchRecords = (0 until 10).map { i => new SimpleRecord(now + i * 2, s"message key $i".getBytes, s"message value $i".getBytes)}
     batches += BatchInfo(firstBatchRecords, true, true)
@@ -82,7 +91,7 @@ class DumpLogSegmentsTest {
 
   @Test
   def testPrintDataLog(): Unit = {
-
+    addSimpleRecords()
     def verifyRecordsInOutput(checkKeysAndValues: Boolean, args: Array[String]): Unit = {
       def isBatch(index: Int): Boolean = {
         var i = 0
@@ -152,6 +161,7 @@ class DumpLogSegmentsTest {
 
   @Test
   def testDumpIndexMismatches(): Unit = {
+    addSimpleRecords()
     val offsetMismatches = mutable.Map[String, List[(Long, Long)]]()
     DumpLogSegments.dumpIndex(new File(indexFilePath), indexSanityOnly = false, verifyOnly = true, offsetMismatches,
       Int.MaxValue)
@@ -160,6 +170,7 @@ class DumpLogSegmentsTest {
 
   @Test
   def testDumpTimeIndexErrors(): Unit = {
+    addSimpleRecords()
     val errors = new TimeIndexDumpErrors
     DumpLogSegments.dumpTimeIndex(new File(timeIndexFilePath), indexSanityOnly = false, verifyOnly = true, errors,
       Int.MaxValue)
@@ -169,6 +180,59 @@ class DumpLogSegmentsTest {
   }
 
   @Test
+  def testDumpMetadataRecords(): Unit = {
+    val mockTime = new MockTime
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = LogTest.createLog(logDir, logConfig, new BrokerTopicStats, mockTime.scheduler, mockTime)
+
+    val metadataRecords = Seq(
+      new ApiMessageAndVersion(
+        new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(10), 0.toShort),
+      new ApiMessageAndVersion(
+        new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(20), 0.toShort),
+      new ApiMessageAndVersion(
+        new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()), 0.toShort),
+      new ApiMessageAndVersion(
+        new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
+          setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+    )
+
+    val records: Array[SimpleRecord] = metadataRecords.map(message => {
+      val serde = new MetadataRecordSerde()
+      val cache = new ObjectSerializationCache
+      val size = serde.recordSize(message, cache)
+      val buf = ByteBuffer.allocate(size)
+      val writer = new ByteBufferAccessor(buf)
+      serde.write(message, cache, writer)
+      buf.flip()
+      new SimpleRecord(null, buf.array)
+    }).toArray
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, records:_*), leaderEpoch = 1)
+    log.flush()
+
+    var output = runDumpLogSegments(Array("--cluster-metadata-decoder", "false", "--files", logFilePath))
+    assert(output.contains("TOPIC_RECORD"))
+    assert(output.contains("BROKER_RECORD"))
+
+    output = runDumpLogSegments(Array("--cluster-metadata-decoder", "--skip-record-metadata", "false", "--files", logFilePath))
+    assert(output.contains("TOPIC_RECORD"))
+    assert(output.contains("BROKER_RECORD"))
+
+    // Bogus metadata record
+    val buf = ByteBuffer.allocate(4)
+    val writer = new ByteBufferAccessor(buf)
+    writer.writeUnsignedVarint(10000)
+    writer.writeUnsignedVarint(10000)
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(null, buf.array)), leaderEpoch = 2)
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, records:_*), leaderEpoch = 2)
+
+    output = runDumpLogSegments(Array("--cluster-metadata-decoder", "--skip-record-metadata", "false", "--files", logFilePath))
+    assert(output.contains("TOPIC_RECORD"))
+    assert(output.contains("BROKER_RECORD"))
+    assert(output.contains("skipping"))
+  }
+
+  @Test
   def testDumpEmptyIndex(): Unit = {
     val indexFile = new File(indexFilePath)
     new PrintWriter(indexFile).close()