You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/20 01:20:14 UTC

git commit: KAFKA-815 Improve SimpleConsumerShell to take in a max messages config option; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 a376f9221 -> a737986e5


KAFKA-815 Improve SimpleConsumerShell to take in a max messages config option;reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a737986e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a737986e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a737986e

Branch: refs/heads/0.8
Commit: a737986e54ea53a2b93f3d08f5eb7fd155095f3c
Parents: a376f92
Author: Neha Narkhede <ne...@gmail.com>
Authored: Tue Mar 19 17:19:20 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Mar 19 17:19:29 2013 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/ConsoleConsumer.scala     |    5 ++++
 .../scala/kafka/tools/SimpleConsumerShell.scala    |   19 +++++++++++---
 2 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a737986e/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 7e84043..d6c4a51 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -290,6 +290,11 @@ class DefaultMessageFormatter extends MessageFormatter {
   }
 }
 
+class NoOpMessageFormatter extends MessageFormatter {
+  override def init(props: Properties) {}
+  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {}
+}
+
 class ChecksumMessageFormatter extends MessageFormatter {
   private var topicStr: String = _
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a737986e/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 8f274df..3cfa384 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -84,6 +84,11 @@ object SimpleConsumerShell extends Logging {
                            .describedAs("ms")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(1000)
+    val maxMessagesOpt = parser.accepts("max-messages", "The number of messages to consume")
+                           .withRequiredArg
+                           .describedAs("max-messages")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(Integer.MAX_VALUE)
     val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
         "skip it instead of halt.")
     val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend",
@@ -105,6 +110,7 @@ object SimpleConsumerShell extends Logging {
     val fetchSize = options.valueOf(fetchSizeOpt).intValue
     val clientId = options.valueOf(clientIdOpt).toString
     val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
+    val maxMessages = options.valueOf(maxMessagesOpt).intValue
 
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
     val printOffsets = if(options.has(printOffsetOpt)) true else false
@@ -181,14 +187,16 @@ object SimpleConsumerShell extends Logging {
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
 
-    info("Starting simple consumer shell to partition [%s, %d], replica [%d], host and port: [%s, %d], from offset [%d]"
-                 .format(topic, partitionId, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
+    val replicaString = if(replicaId > 0) "leader" else "replica"
+    info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]"
+                 .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
     val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId)
     val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
       def run() {
         var offset = startingOffset
+        var numMessagesConsumed = 0
         try {
-          while(true) {
+          while(numMessagesConsumed < maxMessages) {
             val fetchRequest = fetchRequestBuilder
                     .addFetch(topic, partitionId, offset, fetchSize)
                     .build()
@@ -199,7 +207,7 @@ object SimpleConsumerShell extends Logging {
               return
             }
             debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
-            for(messageAndOffset <- messageSet) {
+            for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages)) {
               try {
                 offset = messageAndOffset.nextOffset
                 if(printOffsets)
@@ -207,6 +215,7 @@ object SimpleConsumerShell extends Logging {
                 val message = messageAndOffset.message
                 val key = if(message.hasKey) Utils.readBytes(message.key) else null
                 formatter.writeTo(key, Utils.readBytes(message.payload), System.out)
+                numMessagesConsumed += 1
               } catch {
                 case e =>
                   if (skipMessageOnError)
@@ -226,6 +235,8 @@ object SimpleConsumerShell extends Logging {
         } catch {
           case e: Throwable =>
             error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic, partitionId, replicaId, offset), e)
+        }finally {
+          info("Consumed " + numMessagesConsumed + " messages")
         }
       }
     }, false)