You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/12 03:26:37 UTC
[2/11] git commit: ConsoleProducer does not have the queue-size
option; kafka-684; patched by Maxime Brugidou; reviewed by Jun Rao
ConsoleProducer does not have the queue-size option; kafka-684; patched by Maxime Brugidou; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/85c9e91c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/85c9e91c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/85c9e91c
Branch: refs/heads/trunk
Commit: 85c9e91c8010c9455ac484c1c679437bd5f43a3c
Parents: fd94251
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jan 8 13:57:16 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 8 13:57:16 2013 -0800
----------------------------------------------------------------------
.../scala/kafka/producer/ConsoleProducer.scala | 57 +++++++++++----
1 files changed, 44 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/85c9e91c/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 8c32115..4e2f2af 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -50,6 +50,27 @@ object ConsoleProducer {
.describedAs("timeout_ms")
.ofType(classOf[java.lang.Long])
.defaultsTo(1000)
+ val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " +
+ " messages will queue awaiting suffient batch size.")
+ .withRequiredArg
+ .describedAs("queue_size")
+ .ofType(classOf[java.lang.Long])
+ .defaultsTo(10000)
+ val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
+ .withRequiredArg
+ .describedAs("queue enqueuetimeout ms")
+ .ofType(classOf[java.lang.Long])
+ .defaultsTo(0)
+ val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
+ .withRequiredArg
+ .describedAs("request required acks")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(0)
+ val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero")
+ .withRequiredArg
+ .describedAs("request timeout ms")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1500)
val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
.withRequiredArg
.describedAs("encoder_class")
@@ -88,6 +109,10 @@ object ConsoleProducer {
val compress = options.has(compressOpt)
val batchSize = options.valueOf(batchSizeOpt)
val sendTimeout = options.valueOf(sendTimeoutOpt)
+ val queueSize = options.valueOf(queueSizeOpt)
+ val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)
+ val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
+ val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
val keyEncoderClass = options.valueOf(keyEncoderOpt)
val valueEncoderClass = options.valueOf(valueEncoderOpt)
val readerClass = options.valueOf(messageReaderOpt)
@@ -102,6 +127,10 @@ object ConsoleProducer {
if(options.has(batchSizeOpt))
props.put("batch.size", batchSize.toString)
props.put("queue.time", sendTimeout.toString)
+ props.put("queue.size", queueSize.toString)
+ props.put("queue.enqueueTimeout.ms", queueEnqueueTimeoutMs.toString)
+ props.put("producer.request.required.acks", requestRequiredAcks.toString)
+ props.put("producer.request.timeout.ms", requestTimeoutMs.toString)
props.put("key.serializer.class", keyEncoderClass)
props.put("serializer.class", valueEncoderClass)
@@ -122,6 +151,7 @@ object ConsoleProducer {
if(message != null)
producer.send(message)
} while(message != null)
+ System.exit(0)
}
def parseLineReaderArgs(args: Iterable[String]): Properties = {
@@ -163,21 +193,22 @@ object ConsoleProducer {
override def readMessage() = {
lineNumber += 1
- val line = reader.readLine()
- if(parseKey) {
- line.indexOf(keySeparator) match {
- case -1 =>
- if(ignoreError)
- new KeyedMessage(topic, line)
- else
- throw new KafkaException("No key found on line " + lineNumber + ": " + line)
- case n =>
- new KeyedMessage(topic,
+ (reader.readLine(), parseKey) match {
+ case (null, _) => null
+ case (line, true) =>
+ line.indexOf(keySeparator) match {
+ case -1 =>
+ if(ignoreError)
+ new KeyedMessage(topic, line)
+ else
+ throw new KafkaException("No key found on line " + lineNumber + ": " + line)
+ case n =>
+ new KeyedMessage(topic,
line.substring(0, n),
if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size))
- }
- } else {
- new KeyedMessage(topic, line)
+ }
+ case (line, false) =>
+ new KeyedMessage(topic, line)
}
}
}