You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Scott Reynolds <sr...@twilio.com> on 2015/01/27 18:52:23 UTC

Consumers closing sockets abruptly?

On my brokers I am seeing this error log message:

Closing socket for /X because of error (X is the ip address of a consumer)
> 2015-01-27_17:32:58.29890 java.io.IOException: Connection reset by peer
> 2015-01-27_17:32:58.29890       at
> sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> 2015-01-27_17:32:58.29891       at
> sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433)
> 2015-01-27_17:32:58.29892       at
> sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565)
> 2015-01-27_17:32:58.29892       at
> kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
> 2015-01-27_17:32:58.29893       at
> kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
> 2015-01-27_17:32:58.29893       at
> kafka.network.MultiSend.writeTo(Transmission.scala:102)
> 2015-01-27_17:32:58.29894       at
> kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
> 2015-01-27_17:32:58.29895       at
> kafka.network.MultiSend.writeTo(Transmission.scala:102)
> 2015-01-27_17:32:58.29895       at
> kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
> 2015-01-27_17:32:58.29896       at
> kafka.network.Processor.write(SocketServer.scala:375)
> 2015-01-27_17:32:58.29896       at
> kafka.network.Processor.run(SocketServer.scala:247)
> 2015-01-27_17:32:58.29897       at java.lang.Thread.run(Thread.java:745)
>

This is because the Processor doesn't handle java.io.IOException and it
falls through to the catch all.

My consumers seem actually really happy. So I don't think there is a real
issue here. But I could use some help figuring out if there is.

We are using the Java consumer like so:

> final ConsumerConnector consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(config);
>     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>     topicCountMap.put(topicName, new Integer(1));
>     final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>     final KafkaStream<byte[], byte[]> stream =
> consumerMap.get(topicName).get(0);
>

and we just iterate over the stream

Questions:
1. What class is the one that makes the network connection to the consumer?
2. Do legit cases exist where the consumer would close its socket
connection ? Zookeeper issues ? Consumer too far behind ?

Re: Consumers closing sockets abruptly?

Posted by Jun Rao <ju...@confluent.io>.
Is there another broker running on that ip? If the replication factor is
larger than 1, the follower will be fetching data from the leader just like
a regular consumer.

Thanks,

Jun

On Tue, Jan 27, 2015 at 9:52 AM, Scott Reynolds <sr...@twilio.com>
wrote:

> On my brokers I am seeing this error log message:
>
> Closing socket for /X because of error (X is the ip address of a consumer)
> > 2015-01-27_17:32:58.29890 java.io.IOException: Connection reset by peer
> > 2015-01-27_17:32:58.29890       at
> > sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> > 2015-01-27_17:32:58.29891       at
> > sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433)
> > 2015-01-27_17:32:58.29892       at
> > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565)
> > 2015-01-27_17:32:58.29892       at
> > kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
> > 2015-01-27_17:32:58.29893       at
> > kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
> > 2015-01-27_17:32:58.29893       at
> > kafka.network.MultiSend.writeTo(Transmission.scala:102)
> > 2015-01-27_17:32:58.29894       at
> > kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
> > 2015-01-27_17:32:58.29895       at
> > kafka.network.MultiSend.writeTo(Transmission.scala:102)
> > 2015-01-27_17:32:58.29895       at
> > kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
> > 2015-01-27_17:32:58.29896       at
> > kafka.network.Processor.write(SocketServer.scala:375)
> > 2015-01-27_17:32:58.29896       at
> > kafka.network.Processor.run(SocketServer.scala:247)
> > 2015-01-27_17:32:58.29897       at java.lang.Thread.run(Thread.java:745)
> >
>
> This is because the Processor doesn't handle java.io.IOException and it
> falls through to the catch all.
>
> My consumers seem actually really happy. So I don't think there is a real
> issue here. But I could use some help figuring out if there is.
>
> We are using the Java consumer like so:
>
> > final ConsumerConnector consumer =
> > kafka.consumer.Consumer.createJavaConsumerConnector(config);
> >     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> >     topicCountMap.put(topicName, new Integer(1));
> >     final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> >     final KafkaStream<byte[], byte[]> stream =
> > consumerMap.get(topicName).get(0);
> >
>
> and we just iterate over the stream
>
> Questions:
> 1. What class is the one that makes the network connection to the consumer?
> 2. Do legit cases exist where the consumer would close its socket
> connection ? Zookeeper issues ? Consumer too far behind ?
>