You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/20 01:20:14 UTC
git commit: KAFKA-815 Improve SimpleConsumerShell to take in a max
messages config option; reviewed by Jun Rao
Updated Branches:
refs/heads/0.8 a376f9221 -> a737986e5
KAFKA-815 Improve SimpleConsumerShell to take in a max messages config option;reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a737986e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a737986e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a737986e
Branch: refs/heads/0.8
Commit: a737986e54ea53a2b93f3d08f5eb7fd155095f3c
Parents: a376f92
Author: Neha Narkhede <ne...@gmail.com>
Authored: Tue Mar 19 17:19:20 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Mar 19 17:19:29 2013 -0700
----------------------------------------------------------------------
.../scala/kafka/consumer/ConsoleConsumer.scala | 5 ++++
.../scala/kafka/tools/SimpleConsumerShell.scala | 19 +++++++++++---
2 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a737986e/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 7e84043..d6c4a51 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -290,6 +290,11 @@ class DefaultMessageFormatter extends MessageFormatter {
}
}
+class NoOpMessageFormatter extends MessageFormatter {
+ override def init(props: Properties) {}
+ def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {}
+}
+
class ChecksumMessageFormatter extends MessageFormatter {
private var topicStr: String = _
http://git-wip-us.apache.org/repos/asf/kafka/blob/a737986e/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 8f274df..3cfa384 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -84,6 +84,11 @@ object SimpleConsumerShell extends Logging {
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000)
+ val maxMessagesOpt = parser.accepts("max-messages", "The number of messages to consume")
+ .withRequiredArg
+ .describedAs("max-messages")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(Integer.MAX_VALUE)
val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
"skip it instead of halt.")
val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend",
@@ -105,6 +110,7 @@ object SimpleConsumerShell extends Logging {
val fetchSize = options.valueOf(fetchSizeOpt).intValue
val clientId = options.valueOf(clientIdOpt).toString
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
+ val maxMessages = options.valueOf(maxMessagesOpt).intValue
val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
val printOffsets = if(options.has(printOffsetOpt)) true else false
@@ -181,14 +187,16 @@ object SimpleConsumerShell extends Logging {
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
formatter.init(formatterArgs)
- info("Starting simple consumer shell to partition [%s, %d], replica [%d], host and port: [%s, %d], from offset [%d]"
- .format(topic, partitionId, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
+ val replicaString = if(replicaId > 0) "leader" else "replica"
+ info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]"
+ .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId)
val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
def run() {
var offset = startingOffset
+ var numMessagesConsumed = 0
try {
- while(true) {
+ while(numMessagesConsumed < maxMessages) {
val fetchRequest = fetchRequestBuilder
.addFetch(topic, partitionId, offset, fetchSize)
.build()
@@ -199,7 +207,7 @@ object SimpleConsumerShell extends Logging {
return
}
debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
- for(messageAndOffset <- messageSet) {
+ for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages)) {
try {
offset = messageAndOffset.nextOffset
if(printOffsets)
@@ -207,6 +215,7 @@ object SimpleConsumerShell extends Logging {
val message = messageAndOffset.message
val key = if(message.hasKey) Utils.readBytes(message.key) else null
formatter.writeTo(key, Utils.readBytes(message.payload), System.out)
+ numMessagesConsumed += 1
} catch {
case e =>
if (skipMessageOnError)
@@ -226,6 +235,8 @@ object SimpleConsumerShell extends Logging {
} catch {
case e: Throwable =>
error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic, partitionId, replicaId, offset), e)
+ }finally {
+ info("Consumed " + numMessagesConsumed + " messages")
}
}
}, false)