You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/14 23:31:43 UTC
[1/5] git commit: ConsoleProducer does not exit correctly; kafka-701;
patched by Maxime Brugidou; reviewed by Jun Rao
ConsoleProducer does not exit correctly; kafka-701; patched by Maxime Brugidou; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53818bb7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53818bb7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53818bb7
Branch: refs/heads/trunk
Commit: 53818bb7ee8022486eee06a22328200fc5cfced1
Parents: c02e7fd
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Jan 14 09:21:10 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jan 14 09:21:10 2013 -0800
----------------------------------------------------------------------
config/consumer.properties | 2 +-
config/server.properties | 2 +-
.../scala/kafka/producer/ConsoleProducer.scala | 34 +++++++++------
3 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/53818bb7/config/consumer.properties
----------------------------------------------------------------------
diff --git a/config/consumer.properties b/config/consumer.properties
index 1c43bf9..9dbd583 100644
--- a/config/consumer.properties
+++ b/config/consumer.properties
@@ -20,7 +20,7 @@
zk.connect=127.0.0.1:2181
# timeout in ms for connecting to zookeeper
-zk.connectiontimeout.ms=1000000
+zk.connection.timeout.ms=1000000
#consumer group id
group.id=test-consumer-group
http://git-wip-us.apache.org/repos/asf/kafka/blob/53818bb7/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 9a9cd06..04408dd 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -105,7 +105,7 @@ log.cleanup.interval.mins=1
zk.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
-zk.connectiontimeout.ms=1000000
+zk.connection.timeout.ms=1000000
# metrics reporter properties
kafka.metrics.polling.interval.secs=5
http://git-wip-us.apache.org/repos/asf/kafka/blob/53818bb7/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 1a98174..8b77465 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -128,7 +128,7 @@ object ConsoleProducer {
props.put("batch.num.messages", batchSize.toString)
props.put("queue.buffering.max.ms", sendTimeout.toString)
props.put("queue.buffering.max.messages", queueSize.toString)
- props.put("queue.enqueueTimeout.ms", queueEnqueueTimeoutMs.toString)
+ props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString)
props.put("request.required.acks", requestRequiredAcks.toString)
props.put("request.timeout.ms", requestTimeoutMs.toString)
props.put("key.serializer.class", keyEncoderClass)
@@ -137,20 +137,26 @@ object ConsoleProducer {
val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
reader.init(System.in, cmdLineProps)
- val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
+ try {
+ val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
- Runtime.getRuntime.addShutdownHook(new Thread() {
- override def run() {
- producer.close()
- }
- })
-
- var message: KeyedMessage[AnyRef, AnyRef] = null
- do {
- message = reader.readMessage()
- if(message != null)
- producer.send(message)
- } while(message != null)
+ Runtime.getRuntime.addShutdownHook(new Thread() {
+ override def run() {
+ producer.close()
+ }
+ })
+
+ var message: KeyedMessage[AnyRef, AnyRef] = null
+ do {
+ message = reader.readMessage()
+ if(message != null)
+ producer.send(message)
+ } while(message != null)
+ } catch {
+ case e: Exception =>
+ e.printStackTrace
+ System.exit(1)
+ }
System.exit(0)
}