You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/02/16 23:25:27 UTC

[kafka] branch 1.1 updated: MINOR: Enable deep-iteration to print data in DumpLogSegments (#4396)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 62acb0a  MINOR: Enable deep-iteration to print data in DumpLogSegments (#4396)
62acb0a is described below

commit 62acb0add5442220b65a4e8cef85c1c48fb89b8b
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Feb 16 23:20:16 2018 +0000

    MINOR: Enable deep-iteration to print data in DumpLogSegments (#4396)
    
    Enable deep-iteration option when print-data-log is enabled in DumpLogSegments. Otherwise data is not printed.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
 .../main/scala/kafka/tools/DumpLogSegments.scala   |  4 +-
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     | 96 ++++++++++++++++++++++
 docs/upgrade.html                                  |  2 +
 3 files changed, 100 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index f6e804d..2fc203a 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -52,7 +52,7 @@ object DumpLogSegments {
                                   .describedAs("size")
                                   .ofType(classOf[java.lang.Integer])
                                   .defaultsTo(5 * 1024 * 1024)
-    val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration.")
+    val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration. Automatically set if print-data-log is enabled.")
     val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
                                .withOptionalArg()
                                .ofType(classOf[java.lang.String])
@@ -85,7 +85,7 @@ object DumpLogSegments {
 
     val files = options.valueOf(filesOpt).split(",")
     val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
-    val isDeepIteration = options.has(deepIterationOpt)
+    val isDeepIteration = options.has(deepIterationOpt) || printDataLog
 
     val messageParser = if (options.has(offsetsOpt)) {
       new OffsetsMessageParser
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
new file mode 100644
index 0000000..01f0010
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.ByteArrayOutputStream
+
+import kafka.log.{ Log, LogConfig, LogManager }
+import kafka.server.{ BrokerTopicStats, LogDirFailureChannel }
+import kafka.utils.{ MockTime, TestUtils }
+import org.apache.kafka.common.record.{ CompressionType, MemoryRecords, SimpleRecord }
+import org.apache.kafka.common.utils.Utils
+import org.junit.Assert._
+import org.junit.{ After, Before, Test }
+
+class DumpLogSegmentsTest {
+
+  val tmpDir = TestUtils.tempDir()
+  val logDir = TestUtils.randomPartitionLogDir(tmpDir)
+  val logFile = s"$logDir/00000000000000000000.log"
+  val time = new MockTime(0, 0)
+
+  @Before
+  def setUp(): Unit = {
+    val log = Log(logDir, LogConfig(), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
+      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      logDirFailureChannel = new LogDirFailureChannel(10))
+
+    /* append two messages */
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0,
+      new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
+    log.flush()
+  }
+
+  @After
+  def tearDown(): Unit = {
+    Utils.delete(tmpDir)
+  }
+
+  @Test
+  def testPrintDataLog(): Unit = {
+
+    def verifyRecordsInOutput(args: Array[String]): Unit = {
+      val output = runDumpLogSegments(args)
+      val lines = output.split("\n")
+      assertTrue(s"Data not printed: $output", lines.length > 2)
+      // Verify that the last two lines are message records
+      (0 until 2).foreach { i =>
+        val line = lines(lines.length - 2 + i)
+        assertTrue(s"Not a valid message record: $line", line.startsWith(s"offset: $i position:"))
+      }
+    }
+
+    def verifyNoRecordsInOutput(args: Array[String]): Unit = {
+      val output = runDumpLogSegments(args)
+      assertFalse(s"Data should not have been printed: $output", output.matches("(?s).*offset: [0-9]* position.*"))
+    }
+
+    // Verify that records are printed with --print-data-log even if --deep-iteration is not specified
+    verifyRecordsInOutput(Array("--print-data-log", "--files", logFile))
+    // Verify that records are printed with --print-data-log if --deep-iteration is also specified
+    verifyRecordsInOutput(Array("--print-data-log", "--deep-iteration", "--files", logFile))
+    // Verify that records are printed with --value-decoder even if --print-data-log is not specified
+    verifyRecordsInOutput(Array("--value-decoder-class", "kafka.serializer.StringDecoder", "--files", logFile))
+    // Verify that records are printed with --key-decoder even if --print-data-log is not specified
+    verifyRecordsInOutput(Array("--key-decoder-class", "kafka.serializer.StringDecoder", "--files", logFile))
+    // Verify that records are printed with --deep-iteration even if --print-data-log is not specified
+    verifyRecordsInOutput(Array("--deep-iteration", "--files", logFile))
+
+    // Verify that records are not printed by default
+    verifyNoRecordsInOutput(Array("--files", logFile))
+  }
+
+  private def runDumpLogSegments(args: Array[String]): String = {
+    val outContent = new ByteArrayOutputStream
+    Console.withOut(outContent) {
+      DumpLogSegments.main(args)
+    }
+    outContent.toString
+  }
+}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index b976173..b3ae68f 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -75,6 +75,8 @@
 	fine-grained timeouts (instead of hard coded retries as in older version).</li>
     <li>Kafka Streams rebalance time was reduced further making Kafka Streams more responsive.</li>
     <li>Kafka Connect now supports message headers in both sink and source connectors, and to manipulate them via simple message transforms. Connectors must be changed to explicitly use them. A new <code>HeaderConverter</code> is introduced to control how headers are (de)serialized, and the new "SimpleHeaderConverter" is used by default to use string representations of values.</li>
+    <li>kafka.tools.DumpLogSegments now automatically sets deep-iteration option if print-data-log is enabled
+        explicitly or implicitly due to any of the other options like decoder.</li>
 </ul>
 
 <h5><a id="upgrade_110_new_protocols" href="#upgrade_110_new_protocols">New Protocol Versions</a></h5>

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.