You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Hyun-Gul Roh (JIRA)" <ji...@apache.org> on 2013/11/21 07:37:35 UTC

[jira] [Created] (KAFKA-1138) Remote producer uses the hostname defined in broker

Hyun-Gul Roh created KAFKA-1138:
-----------------------------------

             Summary: Remote producer uses the hostname defined in broker
                 Key: KAFKA-1138
                 URL: https://issues.apache.org/jira/browse/KAFKA-1138
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 0.8
            Reporter: Hyun-Gul Roh
            Assignee: Jun Rao


When the producer API in the node which is not the broker sends message to a broker, only TopicMetadataRequest is sent, but ProducerRequest is not by observing the log of "kafka-request.log"
According to my analysis, when the producer api sends ProducerRequest, it seems to use the hostname defined in the broker. So, if the hostname is not the one registered in DNS, the producer cannot send the ProducerRequest. 


I am attaching the log:

[2013-11-21 15:28:49,464] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed (kafka.producer.async.DefaultEventHandler)
[2013-11-21 15:28:49,465] INFO Back off for 100 ms before retrying send. Remaining retries = 1 (kafka.producer.async.DefaultEventHandler)
[2013-11-21 15:28:49,566] INFO Fetching metadata from broker id:0,host:111.111.111.111,port:9092 with correlation id 6 for 1 topic(s) Set(test) (kafka.client.ClientUtils$)
[2013-11-21 15:28:49,819] ERROR Producer connection to 111.111.111.111:9092 unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: 연결이 거부됨
	at sun.nio.ch.Net.connect0(Native Method)
	at sun.nio.ch.Net.connect(Net.java:465)
	at sun.nio.ch.Net.connect(Net.java:457)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
	at kafka.utils.Utils$.swallow(Utils.scala:186)
	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
	at kafka.utils.Utils$.swallowError(Utils.scala:45)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2013-11-21 15:28:49,821] WARN Fetching topic metadata with correlation id 6 for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092] failed (kafka.client.ClientUtils$)
java.net.ConnectException: 연결이 거부됨
	at sun.nio.ch.Net.connect0(Native Method)
	at sun.nio.ch.Net.connect(Net.java:465)
	at sun.nio.ch.Net.connect(Net.java:457)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
	at kafka.utils.Utils$.swallow(Utils.scala:186)
	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
	at kafka.utils.Utils$.swallowError(Utils.scala:45)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2013-11-21 15:28:49,822] ERROR fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
	at kafka.utils.Utils$.swallow(Utils.scala:186)
	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
	at kafka.utils.Utils$.swallowError(Utils.scala:45)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.net.ConnectException: 연결이 거부됨
	at sun.nio.ch.Net.connect0(Native Method)
	at sun.nio.ch.Net.connect(Net.java:465)
	at sun.nio.ch.Net.connect(Net.java:457)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
	... 12 more
[2013-11-21 15:28:49,825] INFO Fetching metadata from broker id:0,host:111.111.111.111,port:9092 with correlation id 7 for 1 topic(s) Set(test) (kafka.client.ClientUtils$)
[2013-11-21 15:28:50,021] ERROR Producer connection to 111.111.111.111:9092 unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: 연결이 거부됨
	at sun.nio.ch.Net.connect0(Native Method)
	at sun.nio.ch.Net.connect(Net.java:465)
	at sun.nio.ch.Net.connect(Net.java:457)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
	at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
	at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2013-11-21 15:28:50,024] WARN Fetching topic metadata with correlation id 7 for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092] failed (kafka.client.ClientUtils$)
java.net.ConnectException: 연결이 거부됨
	at sun.nio.ch.Net.connect0(Native Method)
	at sun.nio.ch.Net.connect(Net.java:465)
	at sun.nio.ch.Net.connect(Net.java:457)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
	at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
	at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2013-11-21 15:28:50,025] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed (kafka.producer.async.DefaultEventHandler)
[2013-11-21 15:28:50,026] INFO Back off for 100 ms before retrying send. Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
[2013-11-21 15:28:50,127] INFO Fetching metadata from broker id:0,host:111.111.111.111,port:9092 with correlation id 8 for 1 topic(s) Set(test) (kafka.client.ClientUtils$)
[2013-11-21 15:28:50,324] ERROR Producer connection to 111.111.111.111:9092 unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: 연결이 거부됨
	at sun.nio.ch.Net.connect0(Native Method)
	at sun.nio.ch.Net.connect(Net.java:465)
	at sun.nio.ch.Net.connect(Net.java:457)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
	at kafka.utils.Utils$.swallow(Utils.scala:186)
	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
	at kafka.utils.Utils$.swallowError(Utils.scala:45)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2013-11-21 15:28:50,326] WARN Fetching topic metadata with correlation id 8 for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092] failed (kafka.client.ClientUtils$)
java.net.ConnectException: 연결이 거부됨
	at sun.nio.ch.Net.connect0(Native Method)
	at sun.nio.ch.Net.connect(Net.java:465)
	at sun.nio.ch.Net.connect(Net.java:457)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
	at kafka.utils.Utils$.swallow(Utils.scala:186)
	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
	at kafka.utils.Utils$.swallowError(Utils.scala:45)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2013-11-21 15:28:50,328] ERROR fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
	at kafka.utils.Utils$.swallow(Utils.scala:186)
	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
	at kafka.utils.Utils$.swallowError(Utils.scala:45)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.net.ConnectException: 연결이 거부됨
	at sun.nio.ch.Net.connect0(Native Method)
	at sun.nio.ch.Net.connect(Net.java:465)
	at sun.nio.ch.Net.connect(Net.java:457)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
	... 12 more
[2013-11-21 15:28:50,332] ERROR Failed to send requests for topics test with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
[2013-11-21 15:28:50,333] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
       




--
This message was sent by Atlassian JIRA
(v6.1#6144)