You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/14 18:21:29 UTC

git commit: ConsoleProducer does not exit correctly; kafka-701; patched by Maxime Brugidou; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 c02e7fd69 -> 53818bb7e


ConsoleProducer does not exit correctly; kafka-701; patched by Maxime Brugidou; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53818bb7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53818bb7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53818bb7

Branch: refs/heads/0.8
Commit: 53818bb7ee8022486eee06a22328200fc5cfced1
Parents: c02e7fd
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Jan 14 09:21:10 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jan 14 09:21:10 2013 -0800

----------------------------------------------------------------------
 config/consumer.properties                         |    2 +-
 config/server.properties                           |    2 +-
 .../scala/kafka/producer/ConsoleProducer.scala     |   34 +++++++++------
 3 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/53818bb7/config/consumer.properties
----------------------------------------------------------------------
diff --git a/config/consumer.properties b/config/consumer.properties
index 1c43bf9..9dbd583 100644
--- a/config/consumer.properties
+++ b/config/consumer.properties
@@ -20,7 +20,7 @@
 zk.connect=127.0.0.1:2181
 
 # timeout in ms for connecting to zookeeper
-zk.connectiontimeout.ms=1000000
+zk.connection.timeout.ms=1000000
 
 #consumer group id
 group.id=test-consumer-group

http://git-wip-us.apache.org/repos/asf/kafka/blob/53818bb7/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 9a9cd06..04408dd 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -105,7 +105,7 @@ log.cleanup.interval.mins=1
 zk.connect=localhost:2181
 
 # Timeout in ms for connecting to zookeeper
-zk.connectiontimeout.ms=1000000
+zk.connection.timeout.ms=1000000
 
 # metrics reporter properties
 kafka.metrics.polling.interval.secs=5

http://git-wip-us.apache.org/repos/asf/kafka/blob/53818bb7/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 1a98174..8b77465 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -128,7 +128,7 @@ object ConsoleProducer {
       props.put("batch.num.messages", batchSize.toString)
     props.put("queue.buffering.max.ms", sendTimeout.toString)
     props.put("queue.buffering.max.messages", queueSize.toString)
-    props.put("queue.enqueueTimeout.ms", queueEnqueueTimeoutMs.toString)
+    props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString)
     props.put("request.required.acks", requestRequiredAcks.toString)
     props.put("request.timeout.ms", requestTimeoutMs.toString)
     props.put("key.serializer.class", keyEncoderClass)
@@ -137,20 +137,26 @@ object ConsoleProducer {
     val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
     reader.init(System.in, cmdLineProps)
 
-    val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
+    try {
+        val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
 
-    Runtime.getRuntime.addShutdownHook(new Thread() {
-      override def run() {
-        producer.close()
-      }
-    })
-
-    var message: KeyedMessage[AnyRef, AnyRef] = null
-    do { 
-      message = reader.readMessage()
-      if(message != null)
-        producer.send(message)
-    } while(message != null)
+        Runtime.getRuntime.addShutdownHook(new Thread() {
+          override def run() {
+            producer.close()
+          }
+        })
+
+        var message: KeyedMessage[AnyRef, AnyRef] = null
+        do {
+          message = reader.readMessage()
+          if(message != null)
+            producer.send(message)
+        } while(message != null)
+    } catch {
+      case e: Exception =>
+        e.printStackTrace
+        System.exit(1)
+    }
     System.exit(0)
   }