You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by kaybin wong <ka...@gmail.com> on 2015/04/13 04:13:23 UTC

java consumer client sometimes work,but sometimes not.

hi there.
i got a really issue,consumer(java client) sometimes works,but sometimes
not.
i had read the sources code, bug got nothing,can u help me ?
--------------------------------------------log------------------------------------
[root@slave3 kafka_2.10-0.8.2.0]# ./bin/kafka-console-producer.sh
--broker-list 192.168.1.159:2181 --topic testX
[2015-04-13 10:11:03,218] WARN Property topic is not valid
(kafka.utils.VerifiableProperties)
a^Hte
[2015-04-13 10:11:26,441] WARN Fetching topic metadata with correlation id
0 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-04-13 10:11:26,446] ERROR fetching topic metadata for topics
[Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics
[Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.io.EOFException: Received -1 when reading from channel,
socket has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
... 12 more
[2015-04-13 10:11:26,452] WARN Fetching topic metadata with correlation id
1 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-04-13 10:11:26,455] ERROR Failed to collate messages by topic,
partition due to: fetching topic metadata for topics [Set(testX)] from
broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed
(kafka.producer.async.DefaultEventHandler)
[2015-04-13 10:11:26,560] WARN Fetching topic metadata with correlation id
2 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-04-13 10:11:26,562] ERROR fetching topic metadata for topics
[Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics
[Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.io.EOFException: Received -1 when reading from channel,
socket has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
... 12 more
[2015-04-13 10:11:26,566] WARN Fetching topic metadata with correlation id
3 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-04-13 10:11:26,568] ERROR Failed to collate messages by topic,
partition due to: fetching topic metadata for topics [Set(testX)] from
broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed
(kafka.producer.async.DefaultEventHandler)
[2015-04-13 10:11:26,670] WARN Fetching topic metadata with correlation id
4 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-04-13 10:11:26,672] ERROR fetching topic metadata for topics
[Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics
[Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.io.EOFException: Received -1 when reading from channel,
socket has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
... 12 more
[2015-04-13 10:11:26,676] WARN Fetching topic metadata with correlation id
5 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-04-13 10:11:26,677] ERROR Failed to collate messages by topic,
partition due to: fetching topic metadata for topics [Set(testX)] from
broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed
(kafka.producer.async.DefaultEventHandler)
[2015-04-13 10:11:26,780] WARN Fetching topic metadata with correlation id
6 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-04-13 10:11:26,781] ERROR fetching topic metadata for topics
[Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics
[Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.io.EOFException: Received -1 when reading from channel,
socket has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
... 12 more
[2015-04-13 10:11:26,784] WARN Fetching topic metadata with correlation id
7 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-04-13 10:11:26,786] ERROR Failed to collate messages by topic,
partition due to: fetching topic metadata for topics [Set(testX)] from
broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed
(kafka.producer.async.DefaultEventHandler)
[2015-04-13 10:11:26,888] WARN Fetching topic metadata with correlation id
8 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-04-13 10:11:26,889] ERROR fetching topic metadata for topics
[Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics
[Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.io.EOFException: Received -1 when reading from channel,
socket has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
... 12 more
[2015-04-13 10:11:26,893] ERROR Failed to send requests for topics testX
with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
[2015-04-13 10:11:26,894] ERROR Error in handling batch of 1 events
(kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

Re: java consumer client sometimes work,but sometimes not.

Posted by Guozhang Wang <wa...@gmail.com>.
BTW it seems you are referring to the producer not the consumer as claimed
in the title.

On Mon, Apr 13, 2015 at 5:54 PM, François Méthot <fm...@gmail.com>
wrote:

> It could be that the maximum number of connections was reached for client
> IP you are using.  The defaut is 10. But it can be changed.  I had similar
> intermittents issue because of that.
>
> The property is max.connections. per.ip
>
> Le 2015-04-12 11:20 PM, "kaybin wong" <ka...@gmail.com> a écrit :
> >
> > hi there.
> > i got a really issue,consumer(java client) sometimes works,but sometimes
> > not.
> > i had read the sources code, bug got nothing,can u help me ?
> >
>
> --------------------------------------------log------------------------------------
> > [root@slave3 kafka_2.10-0.8.2.0]# ./bin/kafka-console-producer.sh
> > --broker-list 192.168.1.159:2181 --topic testX
> > [2015-04-13 10:11:03,218] WARN Property topic is not valid
> > (kafka.utils.VerifiableProperties)
> > a^Hte
> > [2015-04-13 10:11:26,441] WARN Fetching topic metadata with correlation
> id
> > 0 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> > failed (kafka.client.ClientUtils$)
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > [2015-04-13 10:11:26,446] ERROR fetching topic metadata for topics
> > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> > failed (kafka.utils.Utils$)
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> > failed
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > Caused by: java.io.EOFException: Received -1 when reading from channel,
> > socket has likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > ... 12 more
> > [2015-04-13 10:11:26,452] WARN Fetching topic metadata with correlation
> id
> > 1 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> > failed (kafka.client.ClientUtils$)
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> > at
> >
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > [2015-04-13 10:11:26,455] ERROR Failed to collate messages by topic,
> > partition due to: fetching topic metadata for topics [Set(testX)] from
> > broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed
> > (kafka.producer.async.DefaultEventHandler)
> > [2015-04-13 10:11:26,560] WARN Fetching topic metadata with correlation
> id
> > 2 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> > failed (kafka.client.ClientUtils$)
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > [2015-04-13 10:11:26,562] ERROR fetching topic metadata for topics
> > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> > failed (kafka.utils.Utils$)
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> > failed
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > Caused by: java.io.EOFException: Received -1 when reading from channel,
> > socket has likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > ... 12 more
> > [2015-04-13 10:11:26,566] WARN Fetching topic metadata with correlation
> id
> > 3 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> > failed (kafka.client.ClientUtils$)
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> > at
> >
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > [2015-04-13 10:11:26,568] ERROR Failed to collate messages by topic,
> > partition due to: fetching topic metadata for topics [Set(testX)] from
> > broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed
> > (kafka.producer.async.DefaultEventHandler)
> > [2015-04-13 10:11:26,670] WARN Fetching topic metadata with correlation
> id
> > 4 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> > failed (kafka.client.ClientUtils$)
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > [2015-04-13 10:11:26,672] ERROR fetching topic metadata for topics
> > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> > failed (kafka.utils.Utils$)
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> > failed
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > Caused by: java.io.EOFException: Received -1 when reading from channel,
> > socket has likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > ... 12 more
> > [2015-04-13 10:11:26,676] WARN Fetching topic metadata with correlation
> id
> > 5 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> > failed (kafka.client.ClientUtils$)
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> > at
> >
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > [2015-04-13 10:11:26,677] ERROR Failed to collate messages by topic,
> > partition due to: fetching topic metadata for topics [Set(testX)] from
> > broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed
> > (kafka.producer.async.DefaultEventHandler)
> > [2015-04-13 10:11:26,780] WARN Fetching topic metadata with correlation
> id
> > 6 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> > failed (kafka.client.ClientUtils$)
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > [2015-04-13 10:11:26,781] ERROR fetching topic metadata for topics
> > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> > failed (kafka.utils.Utils$)
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> > failed
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > Caused by: java.io.EOFException: Received -1 when reading from channel,
> > socket has likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > ... 12 more
> > [2015-04-13 10:11:26,784] WARN Fetching topic metadata with correlation
> id
> > 7 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> > failed (kafka.client.ClientUtils$)
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> > at
> >
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > [2015-04-13 10:11:26,786] ERROR Failed to collate messages by topic,
> > partition due to: fetching topic metadata for topics [Set(testX)] from
> > broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed
> > (kafka.producer.async.DefaultEventHandler)
> > [2015-04-13 10:11:26,888] WARN Fetching topic metadata with correlation
> id
> > 8 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> > failed (kafka.client.ClientUtils$)
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > [2015-04-13 10:11:26,889] ERROR fetching topic metadata for topics
> > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> > failed (kafka.utils.Utils$)
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> > failed
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > Caused by: java.io.EOFException: Received -1 when reading from channel,
> > socket has likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:381)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > at
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > ... 12 more
> > [2015-04-13 10:11:26,893] ERROR Failed to send requests for topics testX
> > with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
> > [2015-04-13 10:11:26,894] ERROR Error in handling batch of 1 events
> > (kafka.producer.async.ProducerSendThread)
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 3
> > tries.
> > at
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>



-- 
-- Guozhang

Re: java consumer client sometimes work,but sometimes not.

Posted by François Méthot <fm...@gmail.com>.
It could be that the maximum number of connections was reached for client
IP you are using.  The defaut is 10. But it can be changed.  I had similar
intermittents issue because of that.

The property is max.connections. per.ip

Le 2015-04-12 11:20 PM, "kaybin wong" <ka...@gmail.com> a écrit :
>
> hi there.
> i got a really issue,consumer(java client) sometimes works,but sometimes
> not.
> i had read the sources code, bug got nothing,can u help me ?
>
--------------------------------------------log------------------------------------
> [root@slave3 kafka_2.10-0.8.2.0]# ./bin/kafka-console-producer.sh
> --broker-list 192.168.1.159:2181 --topic testX
> [2015-04-13 10:11:03,218] WARN Property topic is not valid
> (kafka.utils.VerifiableProperties)
> a^Hte
> [2015-04-13 10:11:26,441] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> failed (kafka.client.ClientUtils$)
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-04-13 10:11:26,446] ERROR fetching topic metadata for topics
> [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> failed (kafka.utils.Utils$)
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> failed
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> Caused by: java.io.EOFException: Received -1 when reading from channel,
> socket has likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> ... 12 more
> [2015-04-13 10:11:26,452] WARN Fetching topic metadata with correlation id
> 1 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> failed (kafka.client.ClientUtils$)
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> at
>
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> at
>
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
>
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> at
>
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-04-13 10:11:26,455] ERROR Failed to collate messages by topic,
> partition due to: fetching topic metadata for topics [Set(testX)] from
> broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed
> (kafka.producer.async.DefaultEventHandler)
> [2015-04-13 10:11:26,560] WARN Fetching topic metadata with correlation id
> 2 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> failed (kafka.client.ClientUtils$)
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-04-13 10:11:26,562] ERROR fetching topic metadata for topics
> [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> failed (kafka.utils.Utils$)
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> failed
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> Caused by: java.io.EOFException: Received -1 when reading from channel,
> socket has likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> ... 12 more
> [2015-04-13 10:11:26,566] WARN Fetching topic metadata with correlation id
> 3 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> failed (kafka.client.ClientUtils$)
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> at
>
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> at
>
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
>
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> at
>
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-04-13 10:11:26,568] ERROR Failed to collate messages by topic,
> partition due to: fetching topic metadata for topics [Set(testX)] from
> broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed
> (kafka.producer.async.DefaultEventHandler)
> [2015-04-13 10:11:26,670] WARN Fetching topic metadata with correlation id
> 4 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> failed (kafka.client.ClientUtils$)
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-04-13 10:11:26,672] ERROR fetching topic metadata for topics
> [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> failed (kafka.utils.Utils$)
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> failed
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> Caused by: java.io.EOFException: Received -1 when reading from channel,
> socket has likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> ... 12 more
> [2015-04-13 10:11:26,676] WARN Fetching topic metadata with correlation id
> 5 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> failed (kafka.client.ClientUtils$)
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> at
>
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> at
>
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
>
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> at
>
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-04-13 10:11:26,677] ERROR Failed to collate messages by topic,
> partition due to: fetching topic metadata for topics [Set(testX)] from
> broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed
> (kafka.producer.async.DefaultEventHandler)
> [2015-04-13 10:11:26,780] WARN Fetching topic metadata with correlation id
> 6 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> failed (kafka.client.ClientUtils$)
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-04-13 10:11:26,781] ERROR fetching topic metadata for topics
> [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> failed (kafka.utils.Utils$)
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> failed
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> Caused by: java.io.EOFException: Received -1 when reading from channel,
> socket has likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> ... 12 more
> [2015-04-13 10:11:26,784] WARN Fetching topic metadata with correlation id
> 7 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> failed (kafka.client.ClientUtils$)
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> at
>
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> at
>
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
>
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> at
>
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-04-13 10:11:26,786] ERROR Failed to collate messages by topic,
> partition due to: fetching topic metadata for topics [Set(testX)] from
> broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)] failed
> (kafka.producer.async.DefaultEventHandler)
> [2015-04-13 10:11:26,888] WARN Fetching topic metadata with correlation id
> 8 for topics [Set(testX)] from broker [id:0,host:192.168.1.159,port:2181]
> failed (kafka.client.ClientUtils$)
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-04-13 10:11:26,889] ERROR fetching topic metadata for topics
> [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> failed (kafka.utils.Utils$)
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(testX)] from broker [ArrayBuffer(id:0,host:192.168.1.159,port:2181)]
> failed
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> at
>
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> Caused by: java.io.EOFException: Received -1 when reading from channel,
> socket has likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at
>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> ... 12 more
> [2015-04-13 10:11:26,893] ERROR Failed to send requests for topics testX
> with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
> [2015-04-13 10:11:26,894] ERROR Error in handling batch of 1 events
> (kafka.producer.async.ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
> at
>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> at
>
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
>
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
>
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)