You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/11 11:39:53 UTC
kafka git commit: KAFKA-4866;
Console consumer `print.value` property is ignored
Repository: kafka
Updated Branches:
refs/heads/trunk d2f5589af -> 8e7516ea2
KAFKA-4866; Console consumer `print.value` property is ignored
This property is mentioned in the quickstart.
Author: huxi <hu...@zhenrongbao.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2661 from amethystic/kafka4866_consoleconsumer_ignore_printvalue
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e7516ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e7516ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e7516ea
Branch: refs/heads/trunk
Commit: 8e7516ea2e6b99a6e9a3fb5a958e62b1fb186cf9
Parents: d2f5589
Author: huxi <hu...@zhenrongbao.com>
Authored: Tue Apr 11 11:31:11 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Apr 11 12:38:09 2017 +0100
----------------------------------------------------------------------
.../scala/kafka/tools/ConsoleConsumer.scala | 26 ++++++++++++++++----
.../unit/kafka/tools/ConsoleConsumerTest.scala | 7 +++---
2 files changed, 25 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8e7516ea/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 56f125a..393fee6 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -414,6 +414,7 @@ object ConsoleConsumer extends Logging {
class DefaultMessageFormatter extends MessageFormatter {
var printKey = false
+ var printValue = true
var printTimestamp = false
var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)
@@ -426,6 +427,8 @@ class DefaultMessageFormatter extends MessageFormatter {
printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
if (props.containsKey("print.key"))
printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
+ if (props.containsKey("print.value"))
+ printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true")
if (props.containsKey("key.separator"))
keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8)
if (props.containsKey("line.separator"))
@@ -440,12 +443,18 @@ class DefaultMessageFormatter extends MessageFormatter {
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
- def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator: Array[Byte]) {
+ def writeSeparator(columnSeparator: Boolean): Unit = {
+ if (columnSeparator)
+ output.write(keySeparator)
+ else
+ output.write(lineSeparator)
+ }
+
+ def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) {
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
output.write(convertedBytes)
- output.write(separator)
}
import consumerRecord._
@@ -455,11 +464,18 @@ class DefaultMessageFormatter extends MessageFormatter {
output.write(s"$timestampType:$timestamp".getBytes(StandardCharsets.UTF_8))
else
output.write(s"NO_TIMESTAMP".getBytes(StandardCharsets.UTF_8))
- output.write(keySeparator)
+ writeSeparator(printKey || printValue)
}
- if (printKey) write(keyDeserializer, key, keySeparator)
- write(valueDeserializer, value, lineSeparator)
+ if (printKey) {
+ write(keyDeserializer, key)
+ writeSeparator(printValue)
+ }
+
+ if (printValue) {
+ write(valueDeserializer, value)
+ output.write(lineSeparator)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8e7516ea/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 013ed3e..e0917a2 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -25,9 +25,8 @@ import kafka.utils.TestUtils
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test
-import org.scalatest.junit.JUnitSuite
-class ConsoleConsumerTest extends JUnitSuite {
+class ConsoleConsumerTest {
@Test
def shouldLimitReadsToMaxMessageLimit() {
@@ -160,7 +159,8 @@ class ConsoleConsumerTest extends JUnitSuite {
"--topic", "test",
"--partition", "0",
"--offset", "LatEst",
- "--new-consumer") //new
+ "--new-consumer", //new
+ "--property", "print.value=false")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
@@ -172,6 +172,7 @@ class ConsoleConsumerTest extends JUnitSuite {
assertEquals(0, config.partitionArg.get)
assertEquals(-1, config.offsetArg)
assertEquals(false, config.fromBeginning)
+ assertEquals(false, config.formatter.asInstanceOf[DefaultMessageFormatter].printValue)
}
@Test