You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jun Rao <ju...@confluent.io> on 2015/02/03 06:25:55 UTC

Re: Consumers closing sockets abruptly?

Is there another broker running on that ip? If the replication factor is
larger than 1, the follower will be fetching data from the leader just like
a regular consumer.

Thanks,

Jun

On Tue, Jan 27, 2015 at 9:52 AM, Scott Reynolds <sr...@twilio.com>
wrote:

> On my brokers I am seeing this error log message:
>
> Closing socket for /X because of error (X is the ip address of a consumer)
> > 2015-01-27_17:32:58.29890 java.io.IOException: Connection reset by peer
> > 2015-01-27_17:32:58.29890       at
> > sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> > 2015-01-27_17:32:58.29891       at
> > sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433)
> > 2015-01-27_17:32:58.29892       at
> > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565)
> > 2015-01-27_17:32:58.29892       at
> > kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
> > 2015-01-27_17:32:58.29893       at
> > kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
> > 2015-01-27_17:32:58.29893       at
> > kafka.network.MultiSend.writeTo(Transmission.scala:102)
> > 2015-01-27_17:32:58.29894       at
> > kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
> > 2015-01-27_17:32:58.29895       at
> > kafka.network.MultiSend.writeTo(Transmission.scala:102)
> > 2015-01-27_17:32:58.29895       at
> > kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
> > 2015-01-27_17:32:58.29896       at
> > kafka.network.Processor.write(SocketServer.scala:375)
> > 2015-01-27_17:32:58.29896       at
> > kafka.network.Processor.run(SocketServer.scala:247)
> > 2015-01-27_17:32:58.29897       at java.lang.Thread.run(Thread.java:745)
> >
>
> This is because the Processor doesn't handle java.io.IOException and it
> falls through to the catch all.
>
> My consumers seem actually really happy. So I don't think there is a real
> issue here. But I could use some help figuring out if there is.
>
> We are using the Java consumer like so:
>
> > final ConsumerConnector consumer =
> > kafka.consumer.Consumer.createJavaConsumerConnector(config);
> >     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> >     topicCountMap.put(topicName, new Integer(1));
> >     final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> >     final KafkaStream<byte[], byte[]> stream =
> > consumerMap.get(topicName).get(0);
> >
>
> and we just iterate over the stream
>
> Questions:
> 1. What class is the one that makes the network connection to the consumer?
> 2. Do legit cases exist where the consumer would close its socket
> connection ? Zookeeper issues ? Consumer too far behind ?
>