You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jun Rao <ju...@confluent.io> on 2015/02/03 06:25:55 UTC
Re: Consumers closing sockets abruptly?
Is there another broker running on that ip? If the replication factor is
larger than 1, the follower will be fetching data from the leader just like
a regular consumer.
Thanks,
Jun
On Tue, Jan 27, 2015 at 9:52 AM, Scott Reynolds <sr...@twilio.com>
wrote:
> On my brokers I am seeing this error log message:
>
> Closing socket for /X because of error (X is the ip address of a consumer)
> > 2015-01-27_17:32:58.29890 java.io.IOException: Connection reset by peer
> > 2015-01-27_17:32:58.29890 at
> > sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> > 2015-01-27_17:32:58.29891 at
> > sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433)
> > 2015-01-27_17:32:58.29892 at
> > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565)
> > 2015-01-27_17:32:58.29892 at
> > kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
> > 2015-01-27_17:32:58.29893 at
> > kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
> > 2015-01-27_17:32:58.29893 at
> > kafka.network.MultiSend.writeTo(Transmission.scala:102)
> > 2015-01-27_17:32:58.29894 at
> > kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
> > 2015-01-27_17:32:58.29895 at
> > kafka.network.MultiSend.writeTo(Transmission.scala:102)
> > 2015-01-27_17:32:58.29895 at
> > kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
> > 2015-01-27_17:32:58.29896 at
> > kafka.network.Processor.write(SocketServer.scala:375)
> > 2015-01-27_17:32:58.29896 at
> > kafka.network.Processor.run(SocketServer.scala:247)
> > 2015-01-27_17:32:58.29897 at java.lang.Thread.run(Thread.java:745)
> >
>
> This is because the Processor doesn't handle java.io.IOException and it
> falls through to the catch all.
>
> My consumers seem actually really happy. So I don't think there is a real
> issue here. But I could use some help figuring out if there is.
>
> We are using the Java consumer like so:
>
> > final ConsumerConnector consumer =
> > kafka.consumer.Consumer.createJavaConsumerConnector(config);
> > Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> > topicCountMap.put(topicName, new Integer(1));
> > final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> > final KafkaStream<byte[], byte[]> stream =
> > consumerMap.get(topicName).get(0);
> >
>
> and we just iterate over the stream
>
> Questions:
> 1. What class is the one that makes the network connection to the consumer?
> 2. Do legit cases exist where the consumer would close its socket
> connection ? Zookeeper issues ? Consumer too far behind ?
>