You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/03/21 01:25:09 UTC
svn commit: r1303232 -
/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala
Author: junrao
Date: Wed Mar 21 00:25:09 2012
New Revision: 1303232
URL: http://svn.apache.org/viewvc?rev=1303232&view=rev
Log:
kafka-console-producer does not take in customized values of --batch-size or --timeout; patched by Jun Rao; reviewed by Edward Smith; KAFKA-279
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1303232&r1=1303231&r2=1303232&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala Wed Mar 21 00:25:09 2012
@@ -36,7 +36,7 @@ object ConsoleProducer {
.withRequiredArg
.describedAs("connection_string")
.ofType(classOf[String])
- val asyncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
+ val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
.withRequiredArg
@@ -78,7 +78,7 @@ object ConsoleProducer {
val topic = options.valueOf(topicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
- val async = options.has(asyncOpt)
+ val sync = options.has(syncOpt)
val compress = options.has(compressOpt)
val batchSize = options.valueOf(batchSizeOpt)
val sendTimeout = options.valueOf(sendTimeoutOpt)
@@ -89,10 +89,10 @@ object ConsoleProducer {
val props = new Properties()
props.put("zk.connect", zkConnect)
props.put("compression.codec", DefaultCompressionCodec.codec.toString)
- props.put("producer.type", if(async) "async" else "sync")
+ props.put("producer.type", if(sync) "sync" else "async")
if(options.has(batchSizeOpt))
- props.put("batch.size", batchSize)
- props.put("queue.enqueueTimeout.ms", sendTimeout.toString)
+ props.put("batch.size", batchSize.toString)
+ props.put("queue.time", sendTimeout.toString)
props.put("serializer.class", encoderClass)
val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]