You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Manikumar (JIRA)" <ji...@apache.org> on 2017/10/30 12:30:00 UTC

[jira] [Resolved] (KAFKA-1066) Reduce logging on producer connection failures

     [ https://issues.apache.org/jira/browse/KAFKA-1066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Manikumar resolved KAFKA-1066.
------------------------------
    Resolution: Auto Closed

Closing inactive issue. The old producer is no longer supported

> Reduce logging on producer connection failures
> ----------------------------------------------
>
>                 Key: KAFKA-1066
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1066
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.8.0
>            Reporter: Jason Rosenberg
>            Assignee: Jun Rao
>
> Below is a stack trace from a unit test, where a producer tries to send a message, but no server is available.
> The exception/message logging seems to be inordinately verbose.  
> I'm thinking a simple change could be to not log full stack traces for simple things like "Connection refused", etc.  Seems it would be fine to just log the exception message in such cases.
> Also, the log levels could be tuned, such that things logged as ERROR indicate that all possible retries have been attempted, rather than having it be an ERROR for each step of the retry/failover process.  Thus, for a redundant, clustered service, it should be considered normal that single nodes will be unavailable (such as when we're doing a rolling restart of the cluster, etc.).  It should only be an ERROR if all brokers/all replicas are unavailable, etc.  This way, we can selectively set our log level to ERROR, and have it be useful.
> This is from one of my unit tests.  I am using the default retry count (3) here, but even if I reduced that, it seems this is a crazy amount of logging.  (I've edited this to remove from each stack trace the portion of testing/calling code, into the Producer.send() call).
> Here's the code snippet that produced the logging below (and note, the server was not available on the requested port).
>       KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, T>(topic, message);
>       producer.send(msg);
> Jason
> 599 [main] ERROR kafka.producer.SyncProducer  - Producer connection to localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 615 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with correlation id 0 for topics [Set(test-topic)] from broker [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 628 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> kafka.common.KafkaException: fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> Caused by: java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	... 32 more
> 642 [main] ERROR kafka.producer.SyncProducer  - Producer connection to localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> 	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> 	at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> 	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 653 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with correlation id 1 for topics [Set(test-topic)] from broker [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> 	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> 	at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> 	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 661 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> 765 [main] ERROR kafka.producer.SyncProducer  - Producer connection to localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 770 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with correlation id 2 for topics [Set(test-topic)] from broker [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 775 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> kafka.common.KafkaException: fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> Caused by: java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	... 32 more
> 780 [main] ERROR kafka.producer.SyncProducer  - Producer connection to localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> 	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> 	at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> 	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 785 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with correlation id 3 for topics [Set(test-topic)] from broker [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> 	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> 	at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> 	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 790 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> 892 [main] ERROR kafka.producer.SyncProducer  - Producer connection to localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 896 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with correlation id 4 for topics [Set(test-topic)] from broker [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 900 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> kafka.common.KafkaException: fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> Caused by: java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	... 32 more
> 906 [main] ERROR kafka.producer.SyncProducer  - Producer connection to localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> 	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> 	at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> 	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 913 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with correlation id 5 for topics [Set(test-topic)] from broker [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> 	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> 	at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> 	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 919 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> 1021 [main] ERROR kafka.producer.SyncProducer  - Producer connection to localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 1027 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with correlation id 6 for topics [Set(test-topic)] from broker [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 1032 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> kafka.common.KafkaException: fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> Caused by: java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	... 32 more
> 1037 [main] ERROR kafka.producer.SyncProducer  - Producer connection to localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> 	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> 	at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> 	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 1041 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with correlation id 7 for topics [Set(test-topic)] from broker [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> 	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> 	at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> 	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> 1047 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> 1148 [main] ERROR kafka.producer.SyncProducer  - Producer connection to localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>         ...
> 1152 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with correlation id 8 for topics [Set(test-topic)] from broker [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>        ...
> 1157 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> kafka.common.KafkaException: fetching topic metadata for topics [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...
> Caused by: java.net.ConnectException: Connection refused
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	... 32 more
> 1163 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics test-topic with correlation ids in [0,8]
> 1163 [main] WARN com.squareup.kafka.ng.producer.KafkaProducer  - Error sending data to kafka
> kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> 	at kafka.producer.Producer.send(Producer.scala:74)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> 	...



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)