You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/26 19:32:08 UTC
[kafka] branch 2.8 updated: MINOR: Add cluster-metadata-decoder to
DumpLogSegments (#10212)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new f271932 MINOR: Add cluster-metadata-decoder to DumpLogSegments (#10212)
f271932 is described below
commit f27193200080dc73a9459548594a2bd12719ad4b
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Fri Feb 26 11:28:11 2021 -0800
MINOR: Add cluster-metadata-decoder to DumpLogSegments (#10212)
Add the --cluster-metadata-decoder and --skip-record-metadata options to
the DumpLogSegments command-line tool, as described in KIP-631.
Co-authored-by: David Arthur <mu...@gmail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../main/scala/kafka/tools/DumpLogSegments.scala | 90 ++++++++++++++++------
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 68 +++++++++++++++-
2 files changed, 132 insertions(+), 26 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 8263176..8bc07da 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -19,14 +19,18 @@ package kafka.tools
import java.io._
+import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode}
import kafka.coordinator.group.GroupMetadataManager
import kafka.coordinator.transaction.TransactionLog
import kafka.log._
import kafka.serializer.Decoder
import kafka.utils._
import kafka.utils.Implicits._
+import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordType}
+import org.apache.kafka.common.protocol.ByteBufferAccessor
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.raft.metadata.MetadataRecordSerde
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@@ -55,7 +59,7 @@ object DumpLogSegments {
suffix match {
case Log.LogFileSuffix =>
dumpLog(file, opts.shouldPrintDataLog, nonConsecutivePairsForLogFilesMap, opts.isDeepIteration,
- opts.maxMessageSize, opts.messageParser)
+ opts.maxMessageSize, opts.messageParser, opts.skipRecordMetadata)
case Log.IndexFileSuffix =>
dumpIndex(file, opts.indexSanityOnly, opts.verifyOnly, misMatchesForIndexFilesMap, opts.maxMessageSize)
case Log.TimeIndexFileSuffix =>
@@ -242,7 +246,8 @@ object DumpLogSegments {
nonConsecutivePairsForLogFilesMap: mutable.Map[String, List[(Long, Long)]],
isDeepIteration: Boolean,
maxMessageSize: Int,
- parser: MessageParser[_, _]): Unit = {
+ parser: MessageParser[_, _],
+ skipRecordMetadata: Boolean): Unit = {
val startOffset = file.getName.split("\\.")(0).toLong
println("Starting offset: " + startOffset)
val fileRecords = FileRecords.open(file, false)
@@ -263,31 +268,38 @@ object DumpLogSegments {
}
lastOffset = record.offset
- print(s"$RecordIndent offset: ${record.offset} isValid: ${record.isValid} crc: ${record.checksumOrNull}" +
- s" keySize: ${record.keySize} valueSize: ${record.valueSize} ${batch.timestampType}: ${record.timestamp}" +
- s" baseOffset: ${batch.baseOffset} lastOffset: ${batch.lastOffset} baseSequence: ${batch.baseSequence}" +
- s" lastSequence: ${batch.lastSequence} producerEpoch: ${batch.producerEpoch} partitionLeaderEpoch: ${batch.partitionLeaderEpoch}" +
- s" batchSize: ${batch.sizeInBytes} magic: ${batch.magic} compressType: ${batch.compressionType} position: ${validBytes}")
-
-
- if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
- print(" sequence: " + record.sequence + " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
- } else {
- print(s" crc: ${record.checksumOrNull} isvalid: ${record.isValid}")
- }
+ var prefix = s"${RecordIndent} "
+ if (!skipRecordMetadata) {
+ print(s"${prefix}offset: ${record.offset} isValid: ${record.isValid} crc: ${record.checksumOrNull}" +
+ s" keySize: ${record.keySize} valueSize: ${record.valueSize} ${batch.timestampType}: ${record.timestamp}" +
+ s" baseOffset: ${batch.baseOffset} lastOffset: ${batch.lastOffset} baseSequence: ${batch.baseSequence}" +
+ s" lastSequence: ${batch.lastSequence} producerEpoch: ${batch.producerEpoch} partitionLeaderEpoch: ${batch.partitionLeaderEpoch}" +
+ s" batchSize: ${batch.sizeInBytes} magic: ${batch.magic} compressType: ${batch.compressionType} position: ${validBytes}")
+ prefix = " "
+
+ if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
+ print(" sequence: " + record.sequence + " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
+ } else {
+ print(s" crc: ${record.checksumOrNull} isvalid: ${record.isValid}")
+ }
- if (batch.isControlBatch) {
- val controlTypeId = ControlRecordType.parseTypeId(record.key)
- ControlRecordType.fromTypeId(controlTypeId) match {
- case ControlRecordType.ABORT | ControlRecordType.COMMIT =>
- val endTxnMarker = EndTransactionMarker.deserialize(record)
- print(s" endTxnMarker: ${endTxnMarker.controlType} coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
- case controlType =>
- print(s" controlType: $controlType($controlTypeId)")
+ if (batch.isControlBatch) {
+ val controlTypeId = ControlRecordType.parseTypeId(record.key)
+ ControlRecordType.fromTypeId(controlTypeId) match {
+ case ControlRecordType.ABORT | ControlRecordType.COMMIT =>
+ val endTxnMarker = EndTransactionMarker.deserialize(record)
+ print(s" endTxnMarker: ${endTxnMarker.controlType} coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
+ case controlType =>
+ print(s" controlType: $controlType($controlTypeId)")
+ }
}
- } else if (printContents) {
+ }
+ if (printContents && !batch.isControlBatch) {
val (key, payload) = parser.parse(record)
- key.foreach(key => print(s" key: $key"))
+ key.foreach { key =>
+ print(s"${prefix}key: $key")
+ prefix = " "
+ }
payload.foreach(payload => print(s" payload: $payload"))
}
println()
@@ -382,6 +394,30 @@ object DumpLogSegments {
}
}
+ private class ClusterMetadataLogMessageParser extends MessageParser[String, String] {
+ val metadataRecordSerde = new MetadataRecordSerde()
+
+ override def parse(record: Record): (Option[String], Option[String]) = {
+ val output = try {
+ val messageAndVersion = metadataRecordSerde.
+ read(new ByteBufferAccessor(record.value), record.valueSize())
+ val json = new ObjectNode(JsonNodeFactory.instance)
+ json.set("type", new TextNode(MetadataRecordType.fromId(
+ messageAndVersion.message().apiKey()).toString))
+ json.set("version", new IntNode(messageAndVersion.version()))
+ json.set("data", MetadataJsonConverters.writeJson(
+ messageAndVersion.message(), messageAndVersion.version()))
+ json.toString()
+ } catch {
+ case e: Throwable => {
+ s"Error at ${record.offset}, skipping. ${e.getMessage}"
+ }
+ }
+ // No keys for metadata records
+ (None, Some(output))
+ }
+ }
+
private class DumpLogSegmentsOptions(args: Array[String]) extends CommandDefaultOptions(args) {
val printOpt = parser.accepts("print-data-log", "if set, printing the messages content when dumping data logs. Automatically set if any decoder option is specified.")
val verifyOpt = parser.accepts("verify-index-only", "if set, just verify the index log without printing its content.")
@@ -409,6 +445,8 @@ object DumpLogSegments {
"__consumer_offsets topic.")
val transactionLogOpt = parser.accepts("transaction-log-decoder", "if set, log data will be parsed as " +
"transaction metadata from the __transaction_state topic.")
+ val clusterMetadataOpt = parser.accepts("cluster-metadata-decoder", "if set, log data will be parsed as cluster metadata records.")
+ val skipRecordMetadataOpt = parser.accepts("skip-record-metadata", "whether to skip printing metadata for each record.")
options = parser.parse(args : _*)
def messageParser: MessageParser[_, _] =
@@ -416,6 +454,8 @@ object DumpLogSegments {
new OffsetsMessageParser
} else if (options.has(transactionLogOpt)) {
new TransactionLogMessageParser
+ } else if (options.has(clusterMetadataOpt)) {
+ new ClusterMetadataLogMessageParser
} else {
val valueDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new VerifiableProperties)
val keyDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new VerifiableProperties)
@@ -425,9 +465,11 @@ object DumpLogSegments {
lazy val shouldPrintDataLog: Boolean = options.has(printOpt) ||
options.has(offsetsOpt) ||
options.has(transactionLogOpt) ||
+ options.has(clusterMetadataOpt) ||
options.has(valueDecoderOpt) ||
options.has(keyDecoderOpt)
+ lazy val skipRecordMetadata = options.has(skipRecordMetadataOpt)
lazy val isDeepIteration: Boolean = options.has(deepIterationOpt) || shouldPrintDataLog
lazy val verifyOnly: Boolean = options.has(verifyOpt)
lazy val indexSanityOnly: Boolean = options.has(indexSanityOpt)
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 7714c7f..1a5e51e 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -18,14 +18,21 @@
package kafka.tools
import java.io.{ByteArrayOutputStream, File, PrintWriter}
+import java.nio.ByteBuffer
+import java.util
import java.util.Properties
-import kafka.log.{Log, LogConfig, LogManager}
+import kafka.log.{Log, LogConfig, LogManager, LogTest}
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
import kafka.utils.{MockTime, TestUtils}
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord}
+import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.metadata.ApiMessageAndVersion
+import org.apache.kafka.raft.metadata.MetadataRecordSerde
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -55,7 +62,9 @@ class DumpLogSegmentsTest {
time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10))
+ }
+ def addSimpleRecords(): Unit = {
val now = System.currentTimeMillis()
val firstBatchRecords = (0 until 10).map { i => new SimpleRecord(now + i * 2, s"message key $i".getBytes, s"message value $i".getBytes)}
batches += BatchInfo(firstBatchRecords, true, true)
@@ -82,7 +91,7 @@ class DumpLogSegmentsTest {
@Test
def testPrintDataLog(): Unit = {
-
+ addSimpleRecords()
def verifyRecordsInOutput(checkKeysAndValues: Boolean, args: Array[String]): Unit = {
def isBatch(index: Int): Boolean = {
var i = 0
@@ -152,6 +161,7 @@ class DumpLogSegmentsTest {
@Test
def testDumpIndexMismatches(): Unit = {
+ addSimpleRecords()
val offsetMismatches = mutable.Map[String, List[(Long, Long)]]()
DumpLogSegments.dumpIndex(new File(indexFilePath), indexSanityOnly = false, verifyOnly = true, offsetMismatches,
Int.MaxValue)
@@ -160,6 +170,7 @@ class DumpLogSegmentsTest {
@Test
def testDumpTimeIndexErrors(): Unit = {
+ addSimpleRecords()
val errors = new TimeIndexDumpErrors
DumpLogSegments.dumpTimeIndex(new File(timeIndexFilePath), indexSanityOnly = false, verifyOnly = true, errors,
Int.MaxValue)
@@ -169,6 +180,59 @@ class DumpLogSegmentsTest {
}
@Test
+ def testDumpMetadataRecords(): Unit = {
+ val mockTime = new MockTime
+ val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+ val log = LogTest.createLog(logDir, logConfig, new BrokerTopicStats, mockTime.scheduler, mockTime)
+
+ val metadataRecords = Seq(
+ new ApiMessageAndVersion(
+ new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(10), 0.toShort),
+ new ApiMessageAndVersion(
+ new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(20), 0.toShort),
+ new ApiMessageAndVersion(
+ new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()), 0.toShort),
+ new ApiMessageAndVersion(
+ new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
+ setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+ )
+
+ val records: Array[SimpleRecord] = metadataRecords.map(message => {
+ val serde = new MetadataRecordSerde()
+ val cache = new ObjectSerializationCache
+ val size = serde.recordSize(message, cache)
+ val buf = ByteBuffer.allocate(size)
+ val writer = new ByteBufferAccessor(buf)
+ serde.write(message, cache, writer)
+ buf.flip()
+ new SimpleRecord(null, buf.array)
+ }).toArray
+ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, records:_*), leaderEpoch = 1)
+ log.flush()
+
+ var output = runDumpLogSegments(Array("--cluster-metadata-decoder", "false", "--files", logFilePath))
+ assert(output.contains("TOPIC_RECORD"))
+ assert(output.contains("BROKER_RECORD"))
+
+ output = runDumpLogSegments(Array("--cluster-metadata-decoder", "--skip-record-metadata", "false", "--files", logFilePath))
+ assert(output.contains("TOPIC_RECORD"))
+ assert(output.contains("BROKER_RECORD"))
+
+ // Bogus metadata record
+ val buf = ByteBuffer.allocate(4)
+ val writer = new ByteBufferAccessor(buf)
+ writer.writeUnsignedVarint(10000)
+ writer.writeUnsignedVarint(10000)
+ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(null, buf.array)), leaderEpoch = 2)
+ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, records:_*), leaderEpoch = 2)
+
+ output = runDumpLogSegments(Array("--cluster-metadata-decoder", "--skip-record-metadata", "false", "--files", logFilePath))
+ assert(output.contains("TOPIC_RECORD"))
+ assert(output.contains("BROKER_RECORD"))
+ assert(output.contains("skipping"))
+ }
+
+ @Test
def testDumpEmptyIndex(): Unit = {
val indexFile = new File(indexFilePath)
new PrintWriter(indexFile).close()