You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/08 22:57:27 UTC

git commit: ConsoleProducer does not have the queue-size option; kafka-684; patched by Maxime Brugidou; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 fd94251d8 -> 85c9e91c8


ConsoleProducer does not have the queue-size option; kafka-684; patched by Maxime Brugidou; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/85c9e91c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/85c9e91c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/85c9e91c

Branch: refs/heads/0.8
Commit: 85c9e91c8010c9455ac484c1c679437bd5f43a3c
Parents: fd94251
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jan 8 13:57:16 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 8 13:57:16 2013 -0800

----------------------------------------------------------------------
 .../scala/kafka/producer/ConsoleProducer.scala     |   57 +++++++++++----
 1 files changed, 44 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/85c9e91c/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 8c32115..4e2f2af 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -50,6 +50,27 @@ object ConsoleProducer {
                                .describedAs("timeout_ms")
                                .ofType(classOf[java.lang.Long])
                                .defaultsTo(1000)
+    val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " + 
+                                                   " messages will queue awaiting suffient batch size.")
+                               .withRequiredArg
+                               .describedAs("queue_size")
+                               .ofType(classOf[java.lang.Long])
+                               .defaultsTo(10000)
+    val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
+                               .withRequiredArg
+                               .describedAs("queue enqueuetimeout ms")
+                               .ofType(classOf[java.lang.Long])
+                               .defaultsTo(0)
+    val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
+                               .withRequiredArg
+                               .describedAs("request required acks")
+                               .ofType(classOf[java.lang.Integer])
+                               .defaultsTo(0)
+    val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero")
+                               .withRequiredArg
+                               .describedAs("request timeout ms")
+                               .ofType(classOf[java.lang.Integer])
+                               .defaultsTo(1500)
     val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
                                  .withRequiredArg
                                  .describedAs("encoder_class")
@@ -88,6 +109,10 @@ object ConsoleProducer {
     val compress = options.has(compressOpt)
     val batchSize = options.valueOf(batchSizeOpt)
     val sendTimeout = options.valueOf(sendTimeoutOpt)
+    val queueSize = options.valueOf(queueSizeOpt)
+    val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)
+    val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
+    val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
     val keyEncoderClass = options.valueOf(keyEncoderOpt)
     val valueEncoderClass = options.valueOf(valueEncoderOpt)
     val readerClass = options.valueOf(messageReaderOpt)
@@ -102,6 +127,10 @@ object ConsoleProducer {
     if(options.has(batchSizeOpt))
       props.put("batch.size", batchSize.toString)
     props.put("queue.time", sendTimeout.toString)
+    props.put("queue.size", queueSize.toString)
+    props.put("queue.enqueueTimeout.ms", queueEnqueueTimeoutMs.toString)
+    props.put("producer.request.required.acks", requestRequiredAcks.toString)
+    props.put("producer.request.timeout.ms", requestTimeoutMs.toString)
     props.put("key.serializer.class", keyEncoderClass)
     props.put("serializer.class", valueEncoderClass)
 
@@ -122,6 +151,7 @@ object ConsoleProducer {
       if(message != null)
         producer.send(message)
     } while(message != null)
+    System.exit(0)
   }
 
   def parseLineReaderArgs(args: Iterable[String]): Properties = {
@@ -163,21 +193,22 @@ object ConsoleProducer {
 
     override def readMessage() = {
       lineNumber += 1
-      val line = reader.readLine()
-      if(parseKey) {
-        line.indexOf(keySeparator) match {
-          case -1 =>
-            if(ignoreError)
-              new KeyedMessage(topic, line)
-            else
-              throw new KafkaException("No key found on line " + lineNumber + ": " + line)
-          case n =>
-            new KeyedMessage(topic,
+      (reader.readLine(), parseKey) match {
+        case (null, _) => null
+        case (line, true) =>
+          line.indexOf(keySeparator) match {
+            case -1 =>
+              if(ignoreError)
+                new KeyedMessage(topic, line)
+              else
+                throw new KafkaException("No key found on line " + lineNumber + ": " + line)
+            case n =>
+              new KeyedMessage(topic,
                              line.substring(0, n), 
                              if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size))
-        }
-      } else {
-        new KeyedMessage(topic, line) 
+          }
+        case (line, false) =>
+          new KeyedMessage(topic, line)
       }
     }
   }