You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/02/23 00:58:00 UTC
kafka git commit: KAFKA-3256;
Add print.timestamp option to console consumer.
Repository: kafka
Updated Branches:
refs/heads/trunk ff7b0f5b4 -> d142f8294
KAFKA-3256; Add print.timestamp option to console consumer.
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #949 from becketqin/KAFKA-3256
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d142f829
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d142f829
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d142f829
Branch: refs/heads/trunk
Commit: d142f8294af67fea20d77dcc5272770af153c0d9
Parents: ff7b0f5
Author: Jiangjie Qin <be...@gmail.com>
Authored: Mon Feb 22 15:57:55 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Feb 22 15:57:55 2016 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d142f829/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index fe2ce9f..0ae057f 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -214,7 +214,7 @@ object ConsoleConsumer extends Logging {
.describedAs("class")
.ofType(classOf[String])
.defaultsTo(classOf[DefaultMessageFormatter].getName)
- val messageFormatterArgOpt = parser.accepts("property")
+ val messageFormatterArgOpt = parser.accepts("property", "The properties to initialize the message formatter.")
.withRequiredArg
.describedAs("prop")
.ofType(classOf[String])
@@ -345,10 +345,13 @@ trait MessageFormatter{
class DefaultMessageFormatter extends MessageFormatter {
var printKey = false
+ var printTimestamp = false
var keySeparator = "\t".getBytes
var lineSeparator = "\n".getBytes
override def init(props: Properties) {
+ if (props.containsKey("print.timestamp"))
+ printTimestamp = props.getProperty("print.timestamp").trim.toLowerCase.equals("true")
if (props.containsKey("print.key"))
printKey = props.getProperty("print.key").trim.toLowerCase.equals("true")
if (props.containsKey("key.separator"))
@@ -358,8 +361,11 @@ class DefaultMessageFormatter extends MessageFormatter {
}
def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
- if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) {
- output.write(s"$timestampType:$timestamp".getBytes)
+ if (printTimestamp) {
+ if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
+ output.write(s"$timestampType:$timestamp".getBytes)
+ else
+ output.write(s"NO_TIMESTAMP".getBytes)
output.write(keySeparator)
}
if (printKey) {