You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/02/23 00:58:00 UTC

kafka git commit: KAFKA-3256; Add print.timestamp option to console consumer.

Repository: kafka
Updated Branches:
  refs/heads/trunk ff7b0f5b4 -> d142f8294


KAFKA-3256; Add print.timestamp option to console consumer.

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #949 from becketqin/KAFKA-3256


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d142f829
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d142f829
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d142f829

Branch: refs/heads/trunk
Commit: d142f8294af67fea20d77dcc5272770af153c0d9
Parents: ff7b0f5
Author: Jiangjie Qin <be...@gmail.com>
Authored: Mon Feb 22 15:57:55 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Feb 22 15:57:55 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d142f829/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index fe2ce9f..0ae057f 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -214,7 +214,7 @@ object ConsoleConsumer extends Logging {
       .describedAs("class")
       .ofType(classOf[String])
       .defaultsTo(classOf[DefaultMessageFormatter].getName)
-    val messageFormatterArgOpt = parser.accepts("property")
+    val messageFormatterArgOpt = parser.accepts("property", "The properties to initialize the message formatter.")
       .withRequiredArg
       .describedAs("prop")
       .ofType(classOf[String])
@@ -345,10 +345,13 @@ trait MessageFormatter{
 
 class DefaultMessageFormatter extends MessageFormatter {
   var printKey = false
+  var printTimestamp = false
   var keySeparator = "\t".getBytes
   var lineSeparator = "\n".getBytes
 
   override def init(props: Properties) {
+    if (props.containsKey("print.timestamp"))
+      printTimestamp = props.getProperty("print.timestamp").trim.toLowerCase.equals("true")
     if (props.containsKey("print.key"))
       printKey = props.getProperty("print.key").trim.toLowerCase.equals("true")
     if (props.containsKey("key.separator"))
@@ -358,8 +361,11 @@ class DefaultMessageFormatter extends MessageFormatter {
   }
 
   def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
-    if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) {
-      output.write(s"$timestampType:$timestamp".getBytes)
+    if (printTimestamp) {
+      if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
+        output.write(s"$timestampType:$timestamp".getBytes)
+      else
+        output.write(s"NO_TIMESTAMP".getBytes)
       output.write(keySeparator)
     }
     if (printKey) {