You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/11 11:39:53 UTC

kafka git commit: KAFKA-4866; Console consumer `print.value` property is ignored

Repository: kafka
Updated Branches:
  refs/heads/trunk d2f5589af -> 8e7516ea2


KAFKA-4866; Console consumer `print.value` property is ignored

This property is mentioned in the quickstart.

Author: huxi <hu...@zhenrongbao.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2661 from amethystic/kafka4866_consoleconsumer_ignore_printvalue


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e7516ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e7516ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e7516ea

Branch: refs/heads/trunk
Commit: 8e7516ea2e6b99a6e9a3fb5a958e62b1fb186cf9
Parents: d2f5589
Author: huxi <hu...@zhenrongbao.com>
Authored: Tue Apr 11 11:31:11 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Apr 11 12:38:09 2017 +0100

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleConsumer.scala     | 26 ++++++++++++++++----
 .../unit/kafka/tools/ConsoleConsumerTest.scala  |  7 +++---
 2 files changed, 25 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8e7516ea/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 56f125a..393fee6 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -414,6 +414,7 @@ object ConsoleConsumer extends Logging {
 
 class DefaultMessageFormatter extends MessageFormatter {
   var printKey = false
+  var printValue = true
   var printTimestamp = false
   var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
   var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)
@@ -426,6 +427,8 @@ class DefaultMessageFormatter extends MessageFormatter {
       printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
     if (props.containsKey("print.key"))
       printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
+    if (props.containsKey("print.value"))
+      printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true")
     if (props.containsKey("key.separator"))
       keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8)
     if (props.containsKey("line.separator"))
@@ -440,12 +443,18 @@ class DefaultMessageFormatter extends MessageFormatter {
 
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
 
-    def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator: Array[Byte]) {
+    def writeSeparator(columnSeparator: Boolean): Unit = {
+      if (columnSeparator)
+        output.write(keySeparator)
+      else
+        output.write(lineSeparator)
+    }
+
+    def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) {
       val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
       val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
         getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
       output.write(convertedBytes)
-      output.write(separator)
     }
 
     import consumerRecord._
@@ -455,11 +464,18 @@ class DefaultMessageFormatter extends MessageFormatter {
         output.write(s"$timestampType:$timestamp".getBytes(StandardCharsets.UTF_8))
       else
         output.write(s"NO_TIMESTAMP".getBytes(StandardCharsets.UTF_8))
-      output.write(keySeparator)
+      writeSeparator(printKey || printValue)
     }
 
-    if (printKey) write(keyDeserializer, key, keySeparator)
-    write(valueDeserializer, value, lineSeparator)
+    if (printKey) {
+      write(keyDeserializer, key)
+      writeSeparator(printValue)
+    }
+
+    if (printValue) {
+      write(valueDeserializer, value)
+      output.write(lineSeparator)
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e7516ea/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 013ed3e..e0917a2 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -25,9 +25,8 @@ import kafka.utils.TestUtils
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
-import org.scalatest.junit.JUnitSuite
 
-class ConsoleConsumerTest extends JUnitSuite {
+class ConsoleConsumerTest {
 
   @Test
   def shouldLimitReadsToMaxMessageLimit() {
@@ -160,7 +159,8 @@ class ConsoleConsumerTest extends JUnitSuite {
       "--topic", "test",
       "--partition", "0",
       "--offset", "LatEst",
-      "--new-consumer") //new
+      "--new-consumer", //new
+      "--property", "print.value=false")
 
     //When
     val config = new ConsoleConsumer.ConsumerConfig(args)
@@ -172,6 +172,7 @@ class ConsoleConsumerTest extends JUnitSuite {
     assertEquals(0, config.partitionArg.get)
     assertEquals(-1, config.offsetArg)
     assertEquals(false, config.fromBeginning)
+    assertEquals(false, config.formatter.asInstanceOf[DefaultMessageFormatter].printValue)
   }
 
   @Test