You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/02/16 23:25:27 UTC
[kafka] branch 1.1 updated: MINOR: Enable deep-iteration to print
data in DumpLogSegments (#4396)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 62acb0a MINOR: Enable deep-iteration to print data in DumpLogSegments (#4396)
62acb0a is described below
commit 62acb0add5442220b65a4e8cef85c1c48fb89b8b
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Feb 16 23:20:16 2018 +0000
MINOR: Enable deep-iteration to print data in DumpLogSegments (#4396)
Enable deep-iteration option when print-data-log is enabled in DumpLogSegments. Otherwise data is not printed.
Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
.../main/scala/kafka/tools/DumpLogSegments.scala | 4 +-
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 96 ++++++++++++++++++++++
docs/upgrade.html | 2 +
3 files changed, 100 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index f6e804d..2fc203a 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -52,7 +52,7 @@ object DumpLogSegments {
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(5 * 1024 * 1024)
- val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration.")
+ val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration. Automatically set if print-data-log is enabled.")
val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
.withOptionalArg()
.ofType(classOf[java.lang.String])
@@ -85,7 +85,7 @@ object DumpLogSegments {
val files = options.valueOf(filesOpt).split(",")
val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
- val isDeepIteration = options.has(deepIterationOpt)
+ val isDeepIteration = options.has(deepIterationOpt) || printDataLog
val messageParser = if (options.has(offsetsOpt)) {
new OffsetsMessageParser
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
new file mode 100644
index 0000000..01f0010
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.ByteArrayOutputStream
+
+import kafka.log.{ Log, LogConfig, LogManager }
+import kafka.server.{ BrokerTopicStats, LogDirFailureChannel }
+import kafka.utils.{ MockTime, TestUtils }
+import org.apache.kafka.common.record.{ CompressionType, MemoryRecords, SimpleRecord }
+import org.apache.kafka.common.utils.Utils
+import org.junit.Assert._
+import org.junit.{ After, Before, Test }
+
+class DumpLogSegmentsTest {
+
+ val tmpDir = TestUtils.tempDir()
+ val logDir = TestUtils.randomPartitionLogDir(tmpDir)
+ val logFile = s"$logDir/00000000000000000000.log"
+ val time = new MockTime(0, 0)
+
+ @Before
+ def setUp(): Unit = {
+ val log = Log(logDir, LogConfig(), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+ time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
+ producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+ logDirFailureChannel = new LogDirFailureChannel(10))
+
+ /* append two messages */
+ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0,
+ new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
+ log.flush()
+ }
+
+ @After
+ def tearDown(): Unit = {
+ Utils.delete(tmpDir)
+ }
+
+ @Test
+ def testPrintDataLog(): Unit = {
+
+ def verifyRecordsInOutput(args: Array[String]): Unit = {
+ val output = runDumpLogSegments(args)
+ val lines = output.split("\n")
+ assertTrue(s"Data not printed: $output", lines.length > 2)
+ // Verify that the last two lines are message records
+ (0 until 2).foreach { i =>
+ val line = lines(lines.length - 2 + i)
+ assertTrue(s"Not a valid message record: $line", line.startsWith(s"offset: $i position:"))
+ }
+ }
+
+ def verifyNoRecordsInOutput(args: Array[String]): Unit = {
+ val output = runDumpLogSegments(args)
+ assertFalse(s"Data should not have been printed: $output", output.matches("(?s).*offset: [0-9]* position.*"))
+ }
+
+ // Verify that records are printed with --print-data-log even if --deep-iteration is not specified
+ verifyRecordsInOutput(Array("--print-data-log", "--files", logFile))
+ // Verify that records are printed with --print-data-log if --deep-iteration is also specified
+ verifyRecordsInOutput(Array("--print-data-log", "--deep-iteration", "--files", logFile))
+ // Verify that records are printed with --value-decoder even if --print-data-log is not specified
+ verifyRecordsInOutput(Array("--value-decoder-class", "kafka.serializer.StringDecoder", "--files", logFile))
+ // Verify that records are printed with --key-decoder even if --print-data-log is not specified
+ verifyRecordsInOutput(Array("--key-decoder-class", "kafka.serializer.StringDecoder", "--files", logFile))
+ // Verify that records are printed with --deep-iteration even if --print-data-log is not specified
+ verifyRecordsInOutput(Array("--deep-iteration", "--files", logFile))
+
+ // Verify that records are not printed by default
+ verifyNoRecordsInOutput(Array("--files", logFile))
+ }
+
+ private def runDumpLogSegments(args: Array[String]): String = {
+ val outContent = new ByteArrayOutputStream
+ Console.withOut(outContent) {
+ DumpLogSegments.main(args)
+ }
+ outContent.toString
+ }
+}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index b976173..b3ae68f 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -75,6 +75,8 @@
fine-grained timeouts (instead of hard coded retries as in older version).</li>
<li>Kafka Streams rebalance time was reduced further making Kafka Streams more responsive.</li>
<li>Kafka Connect now supports message headers in both sink and source connectors, and to manipulate them via simple message transforms. Connectors must be changed to explicitly use them. A new <code>HeaderConverter</code> is introduced to control how headers are (de)serialized, and the new "SimpleHeaderConverter" is used by default to use string representations of values.</li>
+ <li>kafka.tools.DumpLogSegments now automatically sets deep-iteration option if print-data-log is enabled
+ explicitly or implicitly due to any of the other options like decoder.</li>
</ul>
<h5><a id="upgrade_110_new_protocols" href="#upgrade_110_new_protocols">New Protocol Versions</a></h5>
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.