You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/03/21 01:25:09 UTC

svn commit: r1303232 - /incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala

Author: junrao
Date: Wed Mar 21 00:25:09 2012
New Revision: 1303232

URL: http://svn.apache.org/viewvc?rev=1303232&view=rev
Log:
kafka-console-producer does not take in customized values of --batch-size or --timeout; patched by Jun Rao; reviewed by Edward Smith; KAFKA-279

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1303232&r1=1303231&r2=1303232&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala Wed Mar 21 00:25:09 2012
@@ -36,7 +36,7 @@ object ConsoleProducer { 
                            .withRequiredArg
                            .describedAs("connection_string")
                            .ofType(classOf[String])
-    val asyncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
+    val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
     val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
     val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
                              .withRequiredArg
@@ -78,7 +78,7 @@ object ConsoleProducer { 
 
     val topic = options.valueOf(topicOpt)
     val zkConnect = options.valueOf(zkConnectOpt)
-    val async = options.has(asyncOpt)
+    val sync = options.has(syncOpt)
     val compress = options.has(compressOpt)
     val batchSize = options.valueOf(batchSizeOpt)
     val sendTimeout = options.valueOf(sendTimeoutOpt)
@@ -89,10 +89,10 @@ object ConsoleProducer { 
     val props = new Properties()
     props.put("zk.connect", zkConnect)
     props.put("compression.codec", DefaultCompressionCodec.codec.toString)
-    props.put("producer.type", if(async) "async" else "sync")
+    props.put("producer.type", if(sync) "sync" else "async")
     if(options.has(batchSizeOpt))
-      props.put("batch.size", batchSize)
-    props.put("queue.enqueueTimeout.ms", sendTimeout.toString)
+      props.put("batch.size", batchSize.toString)
+    props.put("queue.time", sendTimeout.toString)
     props.put("serializer.class", encoderClass)
 
     val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]