You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Chen Wang <ch...@gmail.com> on 2014/11/02 05:46:55 UTC

Consumer keeps looking connection

Hello Folks,
I am using Highlevel consumer, and it seems to drop connections
intermittently:

2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
Received -1 when reading from channel, socket has likely been closed.
2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
[ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20220;
ClientId:
campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
[test_topic,18] -> PartitionFetchInfo(1681313989,4194304),[test_topic,21]
-> PartitionFetchInfo(141266339,4194304)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect(Native Method)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
        at kafka.network.BlockingChannel.connect(Unknown Source)
        at kafka.consumer.SimpleConsumer.connect(Unknown Source)
        at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source)
        at kafka.metrics.KafkaTimer.time(Unknown Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source)
        at kafka.metrics.KafkaTimer.time(Unknown Source)
        at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
        at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
Source)
        at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
        at kafka.utils.ShutdownableThread.run(Unknown Source)
2014-11-01 13:34:40 VerifiableProperties [INFO] Verifying properties

or sometimes:
2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
null
2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
[ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20222;
ClientId:
campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
[test_topic,18] -> PartitionFetchInfo(1681313989,4194304)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect(Native Method)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
        at kafka.network.BlockingChannel.connect(Unknown Source)
        at kafka.consumer.SimpleConsumer.connect(Unknown Source)
        at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source)
        at kafka.metrics.KafkaTimer.time(Unknown Source)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source)
        at kafka.metrics.KafkaTimer.time(Unknown Source)
        at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
        at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
Source)
        at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
        at kafka.utils.ShutdownableThread.run(Unknown Source)

The config I am using is:
kafka.config.fetch.message.max.bytes4194304kafka.config.group.idmygroupid
kafka.config.rebalance.backoff.ms6000kafka.config.rebalance.max.retries6
kafka.config.zookeeper.connectbrokerlist
kafka.config.zookeeper.session.timeout.ms60000
There should not be any network connectivity issue as all the zookeepers,
brokers,consumers are in the same cluster.

What would be the cause for the connection reset error? Is it because the
zookeeper cannot talk to the broker to get the partitionInfo?
Thanks,
Chen

Re: Consumer keeps looking connection

Posted by Chen Wang <ch...@gmail.com>.
When I check the server log, it also flushes with exception:

[2014-11-02 10:00:32,284] ERROR Closing socket for /10.93.80.119 because of
error (kafka.network.Processor)
java.io.IOException: Broken pipe
        at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
        at
sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433)
        at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565)
        at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
        at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
        at kafka.network.MultiSend.writeTo(Transmission.scala:102)
        at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
        at kafka.network.MultiSend.writeTo(Transmission.scala:102)
        at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
        at kafka.network.Processor.write(SocketServer.scala:405)
        at kafka.network.Processor.run(SocketServer.scala:265)
        at java.lang.Thread.run(Thread.java:744)

On Sat, Nov 1, 2014 at 9:46 PM, Chen Wang <ch...@gmail.com>
wrote:

> Hello Folks,
> I am using Highlevel consumer, and it seems to drop connections
> intermittently:
>
> 2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
> Received -1 when reading from channel, socket has likely been closed.
> 2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
> [ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20220;
> ClientId:
> campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test_topic,18] -> PartitionFetchInfo(1681313989,4194304),[test_topic,21]
> -> PartitionFetchInfo(141266339,4194304)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
>         at kafka.network.BlockingChannel.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
> Source)
>         at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
>         at kafka.utils.ShutdownableThread.run(Unknown Source)
> 2014-11-01 13:34:40 VerifiableProperties [INFO] Verifying properties
>
> or sometimes:
> 2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
> null
> 2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
> [ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20222;
> ClientId:
> campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test_topic,18] -> PartitionFetchInfo(1681313989,4194304)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
>         at kafka.network.BlockingChannel.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
> Source)
>         at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
>         at kafka.utils.ShutdownableThread.run(Unknown Source)
>
> The config I am using is:
> kafka.config.fetch.message.max.bytes4194304kafka.config.group.idmygroupid
> kafka.config.rebalance.backoff.ms6000kafka.config.rebalance.max.retries6
> kafka.config.zookeeper.connectbrokerlist
> kafka.config.zookeeper.session.timeout.ms60000
> There should not be any network connectivity issue as all the zookeepers,
> brokers,consumers are in the same cluster.
>
> What would be the cause for the connection reset error? Is it because the
> zookeeper cannot talk to the broker to get the partitionInfo?
> Thanks,
> Chen
>
>
>
>
>
>

Re: Consumer keeps looking connection

Posted by Jun Rao <ju...@gmail.com>.
It seems that the consumer can't connect to the broker for some reason. Any
other error on the broker? Any issue with the network?

Thanks,

Jun

On Sat, Nov 1, 2014 at 9:46 PM, Chen Wang <ch...@gmail.com>
wrote:

> Hello Folks,
> I am using Highlevel consumer, and it seems to drop connections
> intermittently:
>
> 2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
> Received -1 when reading from channel, socket has likely been closed.
> 2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
>
> [ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20220;
> ClientId:
>
> campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test_topic,18] -> PartitionFetchInfo(1681313989,4194304),[test_topic,21]
> -> PartitionFetchInfo(141266339,4194304)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
>         at kafka.network.BlockingChannel.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
>         at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
> Source)
>         at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
>         at kafka.utils.ShutdownableThread.run(Unknown Source)
> 2014-11-01 13:34:40 VerifiableProperties [INFO] Verifying properties
>
> or sometimes:
> 2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
> null
> 2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
>
> [ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20222;
> ClientId:
>
> campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test_topic,18] -> PartitionFetchInfo(1681313989,4194304)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
>         at kafka.network.BlockingChannel.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
>         at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
> Source)
>         at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
>         at kafka.utils.ShutdownableThread.run(Unknown Source)
>
> The config I am using is:
> kafka.config.fetch.message.max.bytes4194304kafka.config.group.idmygroupid
> kafka.config.rebalance.backoff.ms6000kafka.config.rebalance.max.retries6
> kafka.config.zookeeper.connectbrokerlist
> kafka.config.zookeeper.session.timeout.ms60000
> There should not be any network connectivity issue as all the zookeepers,
> brokers,consumers are in the same cluster.
>
> What would be the cause for the connection reset error? Is it because the
> zookeeper cannot talk to the broker to get the partitionInfo?
> Thanks,
> Chen
>