You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/12/04 17:04:55 UTC
[kafka] branch trunk updated: KAFKA-7687;
Print batch level information in DumpLogSegments when deep
iterating (#5976)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f65b1c4 KAFKA-7687; Print batch level information in DumpLogSegments when deep iterating (#5976)
f65b1c4 is described below
commit f65b1c4796fdf34a883e0228f7b74a0fe36dd4a8
Author: huxi <hu...@hotmail.com>
AuthorDate: Wed Dec 5 01:04:39 2018 +0800
KAFKA-7687; Print batch level information in DumpLogSegments when deep iterating (#5976)
DumpLogSegments should print batch level information when deep-iteration is specified.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../main/scala/kafka/tools/DumpLogSegments.scala | 45 +++++++++++-----------
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 17 +++++---
2 files changed, 34 insertions(+), 28 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 281e920..64eda92 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -36,6 +36,9 @@ import scala.collection.JavaConverters._
object DumpLogSegments {
+ // visible for testing
+ private[tools] val RECORD_INDENT = "|"
+
def main(args: Array[String]) {
val opts = new DumpLogSegmentsOptions(args)
CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.")
@@ -277,6 +280,7 @@ object DumpLogSegments {
var lastOffset = -1L
for (batch <- messageSet.batches.asScala) {
+ printBatchLevel(batch, validBytes)
if (isDeepIteration) {
for (record <- batch.asScala) {
if (lastOffset == -1)
@@ -288,17 +292,13 @@ object DumpLogSegments {
}
lastOffset = record.offset
- print("offset: " + record.offset + " position: " + validBytes +
- " " + batch.timestampType + ": " + record.timestamp + " isvalid: " + record.isValid +
- " keysize: " + record.keySize + " valuesize: " + record.valueSize + " magic: " + batch.magic +
- " compresscodec: " + batch.compressionType)
+ print(s"$RECORD_INDENT offset: ${record.offset} ${batch.timestampType}: ${record.timestamp} " +
+ s"keysize: ${record.keySize} valuesize: ${record.valueSize}")
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
- print(" producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch + " sequence: " + record.sequence +
- " isTransactional: " + batch.isTransactional +
- " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
+ print(" sequence: " + record.sequence + " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
} else {
- print(" crc: " + record.checksumOrNull)
+ print(s" crc: ${record.checksumOrNull} isvalid: ${record.isValid}")
}
if (batch.isControlBatch) {
@@ -317,20 +317,6 @@ object DumpLogSegments {
}
println()
}
- } else {
- if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
- print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset + " count: " + batch.countOrNull +
- " baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence +
- " producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch +
- " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional +
- " isControl: " + batch.isControlBatch)
- else
- print("offset: " + batch.lastOffset)
-
- println(" position: " + validBytes + " " + batch.timestampType + ": " + batch.maxTimestamp +
- " isvalid: " + batch.isValid +
- " size: " + batch.sizeInBytes + " magic: " + batch.magic +
- " compresscodec: " + batch.compressionType + " crc: " + batch.checksum)
}
validBytes += batch.sizeInBytes
}
@@ -339,6 +325,21 @@ object DumpLogSegments {
println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName))
}
+ private def printBatchLevel(batch: FileLogInputStream.FileChannelRecordBatch, accumulativeBytes: Long): Unit = {
+ if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+ print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset + " count: " + batch.countOrNull +
+ " baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence +
+ " producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch +
+ " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional +
+ " isControl: " + batch.isControlBatch)
+ else
+ print("offset: " + batch.lastOffset)
+
+ println(" position: " + accumulativeBytes + " " + batch.timestampType + ": " + batch.maxTimestamp +
+ " size: " + batch.sizeInBytes + " magic: " + batch.magic +
+ " compresscodec: " + batch.compressionType + " crc: " + batch.checksum + " isvalid: " + batch.isValid)
+ }
+
class TimeIndexDumpErrors {
val misMatchesForTimeIndexFilesMap = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
val outOfOrderTimestamp = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 01f0010..2ee90ad 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -41,7 +41,9 @@ class DumpLogSegmentsTest {
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10))
- /* append two messages */
+ /* append four messages */
+ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0,
+ new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0,
new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
log.flush()
@@ -59,16 +61,19 @@ class DumpLogSegmentsTest {
val output = runDumpLogSegments(args)
val lines = output.split("\n")
assertTrue(s"Data not printed: $output", lines.length > 2)
- // Verify that the last two lines are message records
- (0 until 2).foreach { i =>
- val line = lines(lines.length - 2 + i)
- assertTrue(s"Not a valid message record: $line", line.startsWith(s"offset: $i position:"))
+ // For every three message records, verify that the first line is batch-level message record and the last two lines are message records
+ (0 until 6 ).foreach { i =>
+ val line = lines(lines.length - 6 + i)
+ if (i % 3 == 0)
+ assertTrue(s"Not a valid batch-level message record: $line", line.startsWith(s"baseOffset: ${i / 3 * 2} lastOffset: "))
+ else
+ assertTrue(s"Not a valid message record: $line", line.startsWith(s"${DumpLogSegments.RECORD_INDENT} offset: ${i - 1 - i / 3}"))
}
}
def verifyNoRecordsInOutput(args: Array[String]): Unit = {
val output = runDumpLogSegments(args)
- assertFalse(s"Data should not have been printed: $output", output.matches("(?s).*offset: [0-9]* position.*"))
+ assertFalse(s"Data should not have been printed: $output", output.matches("(?s).*offset: [0-9]* isvalid.*"))
}
// Verify that records are printed with --print-data-log even if --deep-iteration is not specified