You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ahmy Yulrizka <ah...@yulrizka.com> on 2014/01/21 12:29:42 UTC

Possibly leaking socket on ReplicaFetcherThread

We are running 3 kafka nodes, which servers 4 partition.
We have been experiencing weird behavior during network outage.

we had been experiencing twice in the last couple of days. the previous one
took down all of the cluster.
while this one only 2 out of 3 survive. and 1 node became the leader of all
partition, and other node only in ISR of 1 partition (out of 4)

my best guess now is that when the network down, the broker can't connect
to other broker to do replication and keep opening the socket
without closing it. But I'm not entirely sure about this.

Is there any way to mitigate the problem ? or is there any configuration
options to stop this from happening again ?


The java/kafka process open too many socket file descriptor.
running `lsof -a -p 11818` yield thousand of this line.

...
java    11818 kafka 3059u  sock                0,7       0t0 615637305
can't identify protocol
java    11818 kafka 3060u  sock                0,7       0t0 615637306
can't identify protocol
java    11818 kafka 3061u  sock                0,7       0t0 615637307
can't identify protocol
java    11818 kafka 3062u  sock                0,7       0t0 615637308
can't identify protocol
java    11818 kafka 3063u  sock                0,7       0t0 615637309
can't identify protocol
java    11818 kafka 3064u  sock                0,7       0t0 615637310
can't identify protocol
java    11818 kafka 3065u  sock                0,7       0t0 615637311
can't identify protocol
...

i verify that the the open socket did not close when i repeated the command
after 2 minutes.


and the kafka log on the broken node, generate lots of error like this:

[2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
kafka.network.Acceptor  - Error in acceptor
java.io.IOException: Too many open files
        at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
        at
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
        at kafka.network.Acceptor.accept(SocketServer.scala:200)
        at kafka.network.Acceptor.run(SocketServer.scala:154)
        at java.lang.Thread.run(Thread.java:701)
[2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
kafka.network.Acceptor  - Error in acceptor
java.io.IOException: Too many open files
        at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
        at
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
        at kafka.network.Acceptor.accept(SocketServer.scala:200)
        at kafka.network.Acceptor.run(SocketServer.scala:154)
        at java.lang.Thread.run(Thread.java:701)
[2014-01-21 04:21:48,811]  64573917 [ReplicaFetcherThread-0-1] INFO
 kafka.consumer.SimpleConsumer  - Reconnect due to socket error: null
[2014-01-21 04:21:48,819]  64573925 [ReplicaFetcherThread-0-1] WARN
 kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-0-1], Error in
fetch Name: FetchRequest; Version: 0; CorrelationId: 74930218; ClientId:
ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
RequestInfo: [some-topic,0] ->
PartitionFetchInfo(959825,1048576),[some-topic,3] ->
PartitionFetchInfo(551546,1048576)
java.net.SocketException: Too many open files
        at sun.nio.ch.Net.socket0(Native Method)
        at sun.nio.ch.Net.socket(Net.java:156)
        at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
        at
sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
        at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
        at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
        at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
        at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
        at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)


--
Ahmy Yulrizka
http://ahmy.yulrizka.com
@yulrizka

Re: Possibly leaking socket on ReplicaFetcherThread

Posted by Helin Xiang <xk...@gmail.com>.
Sorry for me not replying in the thread. ignore last email.


Hi, Jun

We experienced a network device problem. and cause all brokers crashed.
After investigation, we found server log throw similar exceptions.

this:

java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:29)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
        at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
        at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)


and this:

2014-12-04 17:12:19,223 [kafka-scheduler-1] FATAL
kafka.server.ReplicaManager  - [Replica Manager on Broker 1]: Error writing
to highwatermark file:
java.io.FileNotFoundException:
/data3/kafka-logs/replication-offset-checkpoint.tmp (Too many open files)
        at java.io.FileOutputStream.open(Native Method)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:145)
        at java.io.FileWriter.<init>(FileWriter.java:73)
        at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
        at
kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:447)
        at
kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:444)
        at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)


we count the number of java.nio.channels.UnresolvedAddressException  and
found it is around 63000, since a healthy kafka would open 2k fd in our
environment, we believe opened fd hit the our system's limit 65535.

so, it seems the bug is not fixed.

after checking the code, we believe it still would leak socket fd.
===============================================
our guess:

in simpleconsumer.scala:

  private def disconnect() = {
    if(blockingChannel.isConnected) {
      debug("Disconnecting from " + host + ":" + port)
      blockingChannel.disconnect()
    }
  }

but when the exception happened, blockingChannel.isConnected would be false,
because in blockingchannel.scala:

  def connect() = lock synchronized  {
    if(!connected) {
      channel = SocketChannel.open()
      if(readBufferSize > 0)
        channel.socket.setReceiveBufferSize(readBufferSize)
      if(writeBufferSize > 0)
        channel.socket.setSendBufferSize(writeBufferSize)
      channel.configureBlocking(true)
      channel.socket.setSoTimeout(readTimeoutMs)
      channel.socket.setKeepAlive(true)
      channel.socket.setTcpNoDelay(true)
      channel.connect(new InetSocketAddress(host, port))    <--  exception
happened here

      writeChannel = channel
      readChannel = Channels.newChannel(channel.socket().getInputStream)
      connected = true   <--  connected reset happened here, no chance to
be true
      ... ...


Thanks.

On Thu, Feb 13, 2014 at 1:33 PM, Jun Rao <ju...@gmail.com> wrote:

> This is fixed in https://issues.apache.org/jira/browse/KAFKA-1228 and will
> be included in 0.8.1 release.
>
> Thanks,
>
> Jun
>
>
> On Wed, Feb 12, 2014 at 6:28 PM, Priya Matpadi
> <pr...@ecofactor.com>wrote:
>
> > Hello,
> > Is there any progress on this issue? We also experience socket leak in
> case
> > of network outage.
> > Thanks,
> > Priya
> >
> >
> > On Fri, Jan 24, 2014 at 7:30 AM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Thanks for find this out. We probably should disconnect on any
> exception.
> > > Could you file a jira and perhaps attach a patch?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Fri, Jan 24, 2014 at 6:06 AM, Ahmy Yulrizka <ah...@yulrizka.com>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I Think I found out the problem..
> > > >
> > > > this is part of the stack trace. First i think there is connection
> > > problem,
> > > > and when connection restore it get new information from the zookeeper
> > > >
> > > > [2014-01-23 23:24:55,391] INFO Opening socket connection to server
> > > > host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> > > > [2014-01-23 23:24:55,391] INFO Socket connection established to
> > > > host2.provider.com/2.2.2.2:2181, initiating session
> > > > (org.apache.zookeeper.ClientCnxn)
> > > > [2014-01-23 23:24:55,391] DEBUG Session establishment request sent on
> > > > host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> > > > [2014-01-23 23:24:57,393] INFO Client session timed out, have not
> heard
> > > > from server in 2002ms for sessionid 0x0, closing socket connection
> and
> > > > attempting reconnect (org.apache.zookeeper.ClientCnxn)
> > > > [2014-01-23 23:24:58,047] INFO Opening socket connection to server
> > > > host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> > > > [2014-01-23 23:24:59,048] INFO Socket connection established to
> > > > host3.provider.com/3.3.3.3:2181, initiating session
> > > > (org.apache.zookeeper.ClientCnxn)
> > > > [2014-01-23 23:24:59,049] DEBUG Session establishment request sent on
> > > > host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> > > > [2014-01-23 23:24:59,539] INFO Session establishment complete on
> server
> > > > host3.provider.com/3.3.3.3:2181, sessionid = 0x343c13436e50001,
> > > negotiated
> > > > timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> > > > [2014-01-23 23:24:59,539] DEBUG Received event: WatchedEvent
> > > > state:SyncConnected type:None path:null
> (org.I0Itec.zkclient.ZkClient)
> > > > [2014-01-23 23:24:59,539] INFO zookeeper state changed
> (SyncConnected)
> > > > (org.I0Itec.zkclient.ZkClient)
> > > > [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> > > > SyncConnected sent to
> > > > kafka.server.KafkaZooKeeper$SessionExpireListener@163e1f27]
> > > > (org.I0Itec.zkclient.ZkEventThread)
> > > > [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> > > > SyncConnected sent to
> > > > kafka.controller.KafkaController$SessionExpirationListener@486f44d9]
> > > > (org.I0Itec.zkclient.ZkEventThread)
> > > > [2014-01-23 23:24:59,539] DEBUG Leaving process event
> > > > (org.I0Itec.zkclient.ZkClient)
> > > > [2014-01-23 23:24:59,539] DEBUG State is SyncConnected
> > > > (org.I0Itec.zkclient.ZkClient)
> > > > [2014-01-23 23:24:59,540] DEBUG State is SyncConnected
> > > > (org.I0Itec.zkclient.ZkClient)
> > > >
> > > > Then the ReplicaFetcher Thread tries to reconnect.
> > > > At this point it tries to connect to other brokers. but then it can't
> > > > resolve the IP address and throws :
> > > > "java.nio.channels.UnresolvedAddressException"
> > > >
> > > > [2014-01-23 23:25:01,067] WARN [ReplicaFetcherThread-0-1], Error in
> > fetch
> > > > Name: FetchRequest; Version: 0; CorrelationId: 478411; ClientId:
> > > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > > bytes;
> > > > RequestInfo: [some-topic,0] ->
> > > > PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> > > > PartitionFetchInfo(839677,1048576)
> (kafka.server.ReplicaFetcherThread)
> > > > java.nio.channels.UnresolvedAddressException
> > > > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> > > >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > > at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > > at
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > [2014-01-23 23:25:01,077] INFO Reconnect due to socket error: null
> > > > (kafka.consumer.SimpleConsumer)
> > > > [2014-01-23 23:25:01,078] WARN [ReplicaFetcherThread-0-1], Error in
> > fetch
> > > > Name: FetchRequest; Version: 0; CorrelationId: 478412; ClientId:
> > > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > > bytes;
> > > > RequestInfo: [some-topic,0] ->
> > > > PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> > > > PartitionFetchInfo(839677,1048576)
> (kafka.server.ReplicaFetcherThread)
> > > > java.nio.channels.UnresolvedAddressException
> > > > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> > > >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > > at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > > at
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > [2014-01-23 23:25:01,079] INFO Reconnect due to socket error: null
> > > > (kafka.consumer.SimpleConsumer)
> > > >
> > > >
> > > > it throws bunch of this error until too many open file...
> > > >
> > > > [2014-01-23 23:25:03,756] INFO Reconnect due to socket error: null
> > > > (kafka.consumer.SimpleConsumer)
> > > > [2014-01-23 23:25:03,756] WARN [ReplicaFetcherThread-0-1], Error in
> > fetch
> > > > Name: FetchRequest; Version: 0; CorrelationId: 482406; ClientId:
> > > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > > bytes;
> > > > RequestInfo: [some-topic,0] ->
> > > > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > > > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > > > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > > > PartitionFetchInfo(1273826,1048576)
> (kafka.server.ReplicaFetcherThread)
> > > > java.nio.channels.UnresolvedAddressException
> > > > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> > > >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > > at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > > at
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> > > > (kafka.consumer.SimpleConsumer)
> > > > [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in
> > fetch
> > > > Name: FetchRequest; Version: 0; CorrelationId: 482407; ClientId:
> > > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > > bytes;
> > > > RequestInfo: [some-topic,0] ->
> > > > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > > > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > > > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > > > PartitionFetchInfo(1273826,1048576)
> (kafka.server.ReplicaFetcherThread)
> > > > java.net.SocketException: Too many open files
> > > > at sun.nio.ch.Net.socket0(Native Method)
> > > > at sun.nio.ch.Net.socket(Net.java:156)
> > > >  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > > at
> > > >
> > > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > > >  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > > >  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > >  at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > >  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > >  at
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> > > > (kafka.consumer.SimpleConsumer)
> > > > [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in
> > fetch
> > > > Name: FetchRequest; Version: 0; CorrelationId: 482408; ClientId:
> > > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > > bytes;
> > > > RequestInfo: [some-topic,0] ->
> > > > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > > > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > > > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > > > PartitionFetchInfo(1273826,1048576)
> (kafka.server.ReplicaFetcherThread)
> > > > java.net.SocketException: Too many open files
> > > > at sun.nio.ch.Net.socket0(Native Method)
> > > > at sun.nio.ch.Net.socket(Net.java:156)
> > > >  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > > at
> > > >
> > > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > > >  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > > >  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > >  at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > >  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > >  at
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > [2014-01-23 23:25:03,758] INFO Reconnect due to socket error: null
> > > > (kafka.consumer.SimpleConsumer)
> > > >
> > > >
> > > >
> > > > I look into the source code of
> > > > core/src/main/scala/kafka/consumer/SimpleConsumer.scala line 79,
> > > >
> > > > it tries to reconnect(). but only close the connection when it is
> > > > "java.io.IOException" which did not catch
> > > > "java.nio.channels.UnresolvedAddressException"
> > > >
> > > >   private def sendRequest(request: RequestOrResponse): Receive = {
> > > >     lock synchronized {
> > > >       getOrMakeConnection()
> > > >       var response: Receive = null
> > > >       try {
> > > >         blockingChannel.send(request)
> > > >         response = blockingChannel.receive()
> > > >       } catch {
> > > >         case e : java.io.IOException =>
> > > >           info("Reconnect due to socket error:
> > %s".format(e.getMessage))
> > > >           // retry once
> > > >           try {
> > > >             reconnect()
> > > >             blockingChannel.send(request)
> > > >             response = blockingChannel.receive()
> > > >           } catch {
> > > >             case ioe: java.io.IOException =>
> > > >               disconnect()
> > > >               throw ioe
> > > >           }
> > > >         case e: Throwable => throw e
> > > >       }
> > > >       response
> > > >     }
> > > >   }
> > > >
> > > > This is my production setting
> > > >
> > > > OS: Ubuntu 12.04
> > > > kafka : kafka_2.8.0-0.8.0.jar
> > > >
> > > > java :
> > > > java version "1.6.0_27"
> > > > OpenJDK Runtime Environment (IcedTea6 1.12.6)
> > > > (6b27-1.12.6-1ubuntu0.12.04.4)
> > > > OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)
> > > >
> > > > Some interesting fact that if i one of the file descriptor of the
> > process
> > > > using
> > > >
> > > > $ gdb -p KAFKA_PID
> > > > > call close(4567)
> > > >
> > > > it reuse that file description and establish connection to a consumer
> > > >
> > > > java       9708      kafka 4087u     sock                0,7
> > 0t0
> > > > 3258461771 can't identify protocol
> > > > java       9708      kafka 4088u     IPv4         3441430493
> > 0t0
> > > >      TCP host2.provider.com:9092->consumer.host.com:38208
> (ESTABLISHED)
> > > > java       9708      kafka 4089u     sock                0,7
> > 0t0
> > > > 3258461773 can't identify protocol
> > > > java       9708      kafka 4090u     sock                0,7
> > 0t0
> > > > 3258461774 can't identify protocol
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Ahmy Yulrizka
> > > > http://ahmy.yulrizka.com
> > > > @yulrizka
> > > >
> > > >
> > > > On Wed, Jan 22, 2014 at 5:41 AM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > Hmm, without knowing the client ip, it's hard to tell whether those
> > are
> > > > > from replication fetcher threads or not. Are most of those
> > connections
> > > in
> > > > > established mode?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Tue, Jan 21, 2014 at 8:06 AM, Ahmy Yulrizka <ah...@yulrizka.com>
> > > > wrote:
> > > > >
> > > > > > this is the the line i copied on lsof
> > > > > >
> > > > > > ...
> > > > > > java      11818      kafka   98u     sock                0,7
> > > 0t0
> > > > > >  615628183 can't identify protocol
> > > > > > java      11818      kafka   99u     IPv4          615077352
> > > 0t0
> > > > > >    TCP somedomain.com:9092->
> 121-123-123-123.someprovider.net:37547
> > > > > > (CLOSE_WAIT)
> > > > > > java      11818      kafka  100u     IPv4          615077353
> > > 0t0
> > > > > >    TCP somedomain.com:9092->
> 121-123-123-123.someprovider.net:37553
> > > > > > (ESTABLISHED)
> > > > > > java      11818      kafka  101u     sock                0,7
> > > 0t0
> > > > > >  615628184 can't identify protocol
> > > > > > java      11818      kafka  102u     sock                0,7
> > > 0t0
> > > > > >  615628185 can't identify protocol
> > > > > > java      11818      kafka  103u     sock                0,7
> > > 0t0
> > > > > >  615628186 can't identify protocol
> > > > > > ...
> > > > > >
> > > > > > as you can see, from the output, i could see the connection state
> > on
> > > > some
> > > > > > of the TCP, but the sock only gives information "can't identify
> > > > protocol"
> > > > > > so I could not see where or from this sock is originating
> > > > > >
> > > > > > I could not see the connection also when i run netstat -nat
> > > > > >
> > > > > > --
> > > > > > Ahmy Yulrizka
> > > > > > http://ahmy.yulrizka.com
> > > > > > @yulrizka
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 21, 2014 at 4:42 PM, Jun Rao <ju...@gmail.com>
> wrote:
> > > > > >
> > > > > > > What mode are those sockets in (established, closed, etc)?
> Also,
> > > from
> > > > > the
> > > > > > > ip, could you tell whether those sockets are from the client or
> > > from
> > > > > the
> > > > > > > replica fetcher in the brokers.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Jan 21, 2014 at 3:29 AM, Ahmy Yulrizka <
> > ahmy@yulrizka.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > We are running 3 kafka nodes, which servers 4 partition.
> > > > > > > > We have been experiencing weird behavior during network
> outage.
> > > > > > > >
> > > > > > > > we had been experiencing twice in the last couple of days.
> the
> > > > > previous
> > > > > > > one
> > > > > > > > took down all of the cluster.
> > > > > > > > while this one only 2 out of 3 survive. and 1 node became the
> > > > leader
> > > > > of
> > > > > > > all
> > > > > > > > partition, and other node only in ISR of 1 partition (out of
> 4)
> > > > > > > >
> > > > > > > > my best guess now is that when the network down, the broker
> > can't
> > > > > > connect
> > > > > > > > to other broker to do replication and keep opening the socket
> > > > > > > > without closing it. But I'm not entirely sure about this.
> > > > > > > >
> > > > > > > > Is there any way to mitigate the problem ? or is there any
> > > > > > configuration
> > > > > > > > options to stop this from happening again ?
> > > > > > > >
> > > > > > > >
> > > > > > > > The java/kafka process open too many socket file descriptor.
> > > > > > > > running `lsof -a -p 11818` yield thousand of this line.
> > > > > > > >
> > > > > > > > ...
> > > > > > > > java    11818 kafka 3059u  sock                0,7       0t0
> > > > > 615637305
> > > > > > > > can't identify protocol
> > > > > > > > java    11818 kafka 3060u  sock                0,7       0t0
> > > > > 615637306
> > > > > > > > can't identify protocol
> > > > > > > > java    11818 kafka 3061u  sock                0,7       0t0
> > > > > 615637307
> > > > > > > > can't identify protocol
> > > > > > > > java    11818 kafka 3062u  sock                0,7       0t0
> > > > > 615637308
> > > > > > > > can't identify protocol
> > > > > > > > java    11818 kafka 3063u  sock                0,7       0t0
> > > > > 615637309
> > > > > > > > can't identify protocol
> > > > > > > > java    11818 kafka 3064u  sock                0,7       0t0
> > > > > 615637310
> > > > > > > > can't identify protocol
> > > > > > > > java    11818 kafka 3065u  sock                0,7       0t0
> > > > > 615637311
> > > > > > > > can't identify protocol
> > > > > > > > ...
> > > > > > > >
> > > > > > > > i verify that the the open socket did not close when i
> repeated
> > > the
> > > > > > > command
> > > > > > > > after 2 minutes.
> > > > > > > >
> > > > > > > >
> > > > > > > > and the kafka log on the broken node, generate lots of error
> > like
> > > > > this:
> > > > > > > >
> > > > > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > > > > java.io.IOException: Too many open files
> > > > > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> > > > Method)
> > > > > > > >         at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > > > > >         at
> > kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > > > > java.io.IOException: Too many open files
> > > > > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> > > > Method)
> > > > > > > >         at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > > > > >         at
> > kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > > > > [2014-01-21 04:21:48,811]  64573917
> [ReplicaFetcherThread-0-1]
> > > INFO
> > > > > > > >  kafka.consumer.SimpleConsumer  - Reconnect due to socket
> > error:
> > > > null
> > > > > > > > [2014-01-21 04:21:48,819]  64573925
> [ReplicaFetcherThread-0-1]
> > > WARN
> > > > > > > >  kafka.server.ReplicaFetcherThread  -
> > [ReplicaFetcherThread-0-1],
> > > > > Error
> > > > > > > in
> > > > > > > > fetch Name: FetchRequest; Version: 0; CorrelationId:
> 74930218;
> > > > > > ClientId:
> > > > > > > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms;
> > > MinBytes:
> > > > 1
> > > > > > > bytes;
> > > > > > > > RequestInfo: [some-topic,0] ->
> > > > > > > > PartitionFetchInfo(959825,1048576),[some-topic,3] ->
> > > > > > > > PartitionFetchInfo(551546,1048576)
> > > > > > > > java.net.SocketException: Too many open files
> > > > > > > >         at sun.nio.ch.Net.socket0(Native Method)
> > > > > > > >         at sun.nio.ch.Net.socket(Net.java:156)
> > > > > > > >         at
> > > > > > > sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > > > > > >         at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > > > > > > >         at
> > > > > java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > > > > > >         at
> > > > > > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > > > > > > >         at
> > > > > > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > > > > > >         at
> > > > > > >
> kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > > > > > >         at
> > > > > > > >
> > > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > > > > > >         at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > > > > >         at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > > > > > >         at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > > > > >         at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > > > >         at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > > > > >         at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > > > > >         at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > > > >         at
> > > > > > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > > > > > >         at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > > > > > >         at
> > > > > > > >
> > > > > >
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > > > > > >         at
> > > > > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Ahmy Yulrizka
> > > > > > > > http://ahmy.yulrizka.com
> > > > > > > > @yulrizka
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>



-- 


*Best RegardsXiang Helin*

Re: Possibly leaking socket on ReplicaFetcherThread

Posted by Jun Rao <ju...@gmail.com>.
This is fixed in https://issues.apache.org/jira/browse/KAFKA-1228 and will
be included in 0.8.1 release.

Thanks,

Jun


On Wed, Feb 12, 2014 at 6:28 PM, Priya Matpadi
<pr...@ecofactor.com>wrote:

> Hello,
> Is there any progress on this issue? We also experience socket leak in case
> of network outage.
> Thanks,
> Priya
>
>
> On Fri, Jan 24, 2014 at 7:30 AM, Jun Rao <ju...@gmail.com> wrote:
>
> > Thanks for find this out. We probably should disconnect on any exception.
> > Could you file a jira and perhaps attach a patch?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Jan 24, 2014 at 6:06 AM, Ahmy Yulrizka <ah...@yulrizka.com>
> wrote:
> >
> > > Hi,
> > >
> > > I Think I found out the problem..
> > >
> > > this is part of the stack trace. First i think there is connection
> > problem,
> > > and when connection restore it get new information from the zookeeper
> > >
> > > [2014-01-23 23:24:55,391] INFO Opening socket connection to server
> > > host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:55,391] INFO Socket connection established to
> > > host2.provider.com/2.2.2.2:2181, initiating session
> > > (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:55,391] DEBUG Session establishment request sent on
> > > host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:57,393] INFO Client session timed out, have not heard
> > > from server in 2002ms for sessionid 0x0, closing socket connection and
> > > attempting reconnect (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:58,047] INFO Opening socket connection to server
> > > host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:59,048] INFO Socket connection established to
> > > host3.provider.com/3.3.3.3:2181, initiating session
> > > (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:59,049] DEBUG Session establishment request sent on
> > > host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:59,539] INFO Session establishment complete on server
> > > host3.provider.com/3.3.3.3:2181, sessionid = 0x343c13436e50001,
> > negotiated
> > > timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:59,539] DEBUG Received event: WatchedEvent
> > > state:SyncConnected type:None path:null (org.I0Itec.zkclient.ZkClient)
> > > [2014-01-23 23:24:59,539] INFO zookeeper state changed (SyncConnected)
> > > (org.I0Itec.zkclient.ZkClient)
> > > [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> > > SyncConnected sent to
> > > kafka.server.KafkaZooKeeper$SessionExpireListener@163e1f27]
> > > (org.I0Itec.zkclient.ZkEventThread)
> > > [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> > > SyncConnected sent to
> > > kafka.controller.KafkaController$SessionExpirationListener@486f44d9]
> > > (org.I0Itec.zkclient.ZkEventThread)
> > > [2014-01-23 23:24:59,539] DEBUG Leaving process event
> > > (org.I0Itec.zkclient.ZkClient)
> > > [2014-01-23 23:24:59,539] DEBUG State is SyncConnected
> > > (org.I0Itec.zkclient.ZkClient)
> > > [2014-01-23 23:24:59,540] DEBUG State is SyncConnected
> > > (org.I0Itec.zkclient.ZkClient)
> > >
> > > Then the ReplicaFetcher Thread tries to reconnect.
> > > At this point it tries to connect to other brokers. but then it can't
> > > resolve the IP address and throws :
> > > "java.nio.channels.UnresolvedAddressException"
> > >
> > > [2014-01-23 23:25:01,067] WARN [ReplicaFetcherThread-0-1], Error in
> fetch
> > > Name: FetchRequest; Version: 0; CorrelationId: 478411; ClientId:
> > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > bytes;
> > > RequestInfo: [some-topic,0] ->
> > > PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> > > PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
> > > java.nio.channels.UnresolvedAddressException
> > > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> > >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > >  at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > [2014-01-23 23:25:01,077] INFO Reconnect due to socket error: null
> > > (kafka.consumer.SimpleConsumer)
> > > [2014-01-23 23:25:01,078] WARN [ReplicaFetcherThread-0-1], Error in
> fetch
> > > Name: FetchRequest; Version: 0; CorrelationId: 478412; ClientId:
> > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > bytes;
> > > RequestInfo: [some-topic,0] ->
> > > PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> > > PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
> > > java.nio.channels.UnresolvedAddressException
> > > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> > >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > >  at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > [2014-01-23 23:25:01,079] INFO Reconnect due to socket error: null
> > > (kafka.consumer.SimpleConsumer)
> > >
> > >
> > > it throws bunch of this error until too many open file...
> > >
> > > [2014-01-23 23:25:03,756] INFO Reconnect due to socket error: null
> > > (kafka.consumer.SimpleConsumer)
> > > [2014-01-23 23:25:03,756] WARN [ReplicaFetcherThread-0-1], Error in
> fetch
> > > Name: FetchRequest; Version: 0; CorrelationId: 482406; ClientId:
> > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > bytes;
> > > RequestInfo: [some-topic,0] ->
> > > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > > PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> > > java.nio.channels.UnresolvedAddressException
> > > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> > >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > >  at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> > > (kafka.consumer.SimpleConsumer)
> > > [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in
> fetch
> > > Name: FetchRequest; Version: 0; CorrelationId: 482407; ClientId:
> > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > bytes;
> > > RequestInfo: [some-topic,0] ->
> > > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > > PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> > > java.net.SocketException: Too many open files
> > > at sun.nio.ch.Net.socket0(Native Method)
> > > at sun.nio.ch.Net.socket(Net.java:156)
> > >  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > at
> > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > >  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > >  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > >  at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > >  at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> > > (kafka.consumer.SimpleConsumer)
> > > [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in
> fetch
> > > Name: FetchRequest; Version: 0; CorrelationId: 482408; ClientId:
> > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > bytes;
> > > RequestInfo: [some-topic,0] ->
> > > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > > PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> > > java.net.SocketException: Too many open files
> > > at sun.nio.ch.Net.socket0(Native Method)
> > > at sun.nio.ch.Net.socket(Net.java:156)
> > >  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > at
> > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > >  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > >  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > >  at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > >  at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > [2014-01-23 23:25:03,758] INFO Reconnect due to socket error: null
> > > (kafka.consumer.SimpleConsumer)
> > >
> > >
> > >
> > > I look into the source code of
> > > core/src/main/scala/kafka/consumer/SimpleConsumer.scala line 79,
> > >
> > > it tries to reconnect(). but only close the connection when it is
> > > "java.io.IOException" which did not catch
> > > "java.nio.channels.UnresolvedAddressException"
> > >
> > >   private def sendRequest(request: RequestOrResponse): Receive = {
> > >     lock synchronized {
> > >       getOrMakeConnection()
> > >       var response: Receive = null
> > >       try {
> > >         blockingChannel.send(request)
> > >         response = blockingChannel.receive()
> > >       } catch {
> > >         case e : java.io.IOException =>
> > >           info("Reconnect due to socket error:
> %s".format(e.getMessage))
> > >           // retry once
> > >           try {
> > >             reconnect()
> > >             blockingChannel.send(request)
> > >             response = blockingChannel.receive()
> > >           } catch {
> > >             case ioe: java.io.IOException =>
> > >               disconnect()
> > >               throw ioe
> > >           }
> > >         case e: Throwable => throw e
> > >       }
> > >       response
> > >     }
> > >   }
> > >
> > > This is my production setting
> > >
> > > OS: Ubuntu 12.04
> > > kafka : kafka_2.8.0-0.8.0.jar
> > >
> > > java :
> > > java version "1.6.0_27"
> > > OpenJDK Runtime Environment (IcedTea6 1.12.6)
> > > (6b27-1.12.6-1ubuntu0.12.04.4)
> > > OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)
> > >
> > > Some interesting fact that if i one of the file descriptor of the
> process
> > > using
> > >
> > > $ gdb -p KAFKA_PID
> > > > call close(4567)
> > >
> > > it reuse that file description and establish connection to a consumer
> > >
> > > java       9708      kafka 4087u     sock                0,7
> 0t0
> > > 3258461771 can't identify protocol
> > > java       9708      kafka 4088u     IPv4         3441430493
> 0t0
> > >      TCP host2.provider.com:9092->consumer.host.com:38208(ESTABLISHED)
> > > java       9708      kafka 4089u     sock                0,7
> 0t0
> > > 3258461773 can't identify protocol
> > > java       9708      kafka 4090u     sock                0,7
> 0t0
> > > 3258461774 can't identify protocol
> > >
> > >
> > >
> > >
> > > --
> > > Ahmy Yulrizka
> > > http://ahmy.yulrizka.com
> > > @yulrizka
> > >
> > >
> > > On Wed, Jan 22, 2014 at 5:41 AM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > Hmm, without knowing the client ip, it's hard to tell whether those
> are
> > > > from replication fetcher threads or not. Are most of those
> connections
> > in
> > > > established mode?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Tue, Jan 21, 2014 at 8:06 AM, Ahmy Yulrizka <ah...@yulrizka.com>
> > > wrote:
> > > >
> > > > > this is the the line i copied on lsof
> > > > >
> > > > > ...
> > > > > java      11818      kafka   98u     sock                0,7
> > 0t0
> > > > >  615628183 can't identify protocol
> > > > > java      11818      kafka   99u     IPv4          615077352
> > 0t0
> > > > >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37547
> > > > > (CLOSE_WAIT)
> > > > > java      11818      kafka  100u     IPv4          615077353
> > 0t0
> > > > >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37553
> > > > > (ESTABLISHED)
> > > > > java      11818      kafka  101u     sock                0,7
> > 0t0
> > > > >  615628184 can't identify protocol
> > > > > java      11818      kafka  102u     sock                0,7
> > 0t0
> > > > >  615628185 can't identify protocol
> > > > > java      11818      kafka  103u     sock                0,7
> > 0t0
> > > > >  615628186 can't identify protocol
> > > > > ...
> > > > >
> > > > > as you can see, from the output, i could see the connection state
> on
> > > some
> > > > > of the TCP, but the sock only gives information "can't identify
> > > protocol"
> > > > > so I could not see where or from this sock is originating
> > > > >
> > > > > I could not see the connection also when i run netstat -nat
> > > > >
> > > > > --
> > > > > Ahmy Yulrizka
> > > > > http://ahmy.yulrizka.com
> > > > > @yulrizka
> > > > >
> > > > >
> > > > > On Tue, Jan 21, 2014 at 4:42 PM, Jun Rao <ju...@gmail.com> wrote:
> > > > >
> > > > > > What mode are those sockets in (established, closed, etc)? Also,
> > from
> > > > the
> > > > > > ip, could you tell whether those sockets are from the client or
> > from
> > > > the
> > > > > > replica fetcher in the brokers.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 21, 2014 at 3:29 AM, Ahmy Yulrizka <
> ahmy@yulrizka.com>
> > > > > wrote:
> > > > > >
> > > > > > > We are running 3 kafka nodes, which servers 4 partition.
> > > > > > > We have been experiencing weird behavior during network outage.
> > > > > > >
> > > > > > > we had been experiencing twice in the last couple of days. the
> > > > previous
> > > > > > one
> > > > > > > took down all of the cluster.
> > > > > > > while this one only 2 out of 3 survive. and 1 node became the
> > > leader
> > > > of
> > > > > > all
> > > > > > > partition, and other node only in ISR of 1 partition (out of 4)
> > > > > > >
> > > > > > > my best guess now is that when the network down, the broker
> can't
> > > > > connect
> > > > > > > to other broker to do replication and keep opening the socket
> > > > > > > without closing it. But I'm not entirely sure about this.
> > > > > > >
> > > > > > > Is there any way to mitigate the problem ? or is there any
> > > > > configuration
> > > > > > > options to stop this from happening again ?
> > > > > > >
> > > > > > >
> > > > > > > The java/kafka process open too many socket file descriptor.
> > > > > > > running `lsof -a -p 11818` yield thousand of this line.
> > > > > > >
> > > > > > > ...
> > > > > > > java    11818 kafka 3059u  sock                0,7       0t0
> > > > 615637305
> > > > > > > can't identify protocol
> > > > > > > java    11818 kafka 3060u  sock                0,7       0t0
> > > > 615637306
> > > > > > > can't identify protocol
> > > > > > > java    11818 kafka 3061u  sock                0,7       0t0
> > > > 615637307
> > > > > > > can't identify protocol
> > > > > > > java    11818 kafka 3062u  sock                0,7       0t0
> > > > 615637308
> > > > > > > can't identify protocol
> > > > > > > java    11818 kafka 3063u  sock                0,7       0t0
> > > > 615637309
> > > > > > > can't identify protocol
> > > > > > > java    11818 kafka 3064u  sock                0,7       0t0
> > > > 615637310
> > > > > > > can't identify protocol
> > > > > > > java    11818 kafka 3065u  sock                0,7       0t0
> > > > 615637311
> > > > > > > can't identify protocol
> > > > > > > ...
> > > > > > >
> > > > > > > i verify that the the open socket did not close when i repeated
> > the
> > > > > > command
> > > > > > > after 2 minutes.
> > > > > > >
> > > > > > >
> > > > > > > and the kafka log on the broken node, generate lots of error
> like
> > > > this:
> > > > > > >
> > > > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > > > java.io.IOException: Too many open files
> > > > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> > > Method)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > > > >         at
> kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > > > java.io.IOException: Too many open files
> > > > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> > > Method)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > > > >         at
> kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > > > [2014-01-21 04:21:48,811]  64573917 [ReplicaFetcherThread-0-1]
> > INFO
> > > > > > >  kafka.consumer.SimpleConsumer  - Reconnect due to socket
> error:
> > > null
> > > > > > > [2014-01-21 04:21:48,819]  64573925 [ReplicaFetcherThread-0-1]
> > WARN
> > > > > > >  kafka.server.ReplicaFetcherThread  -
> [ReplicaFetcherThread-0-1],
> > > > Error
> > > > > > in
> > > > > > > fetch Name: FetchRequest; Version: 0; CorrelationId: 74930218;
> > > > > ClientId:
> > > > > > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms;
> > MinBytes:
> > > 1
> > > > > > bytes;
> > > > > > > RequestInfo: [some-topic,0] ->
> > > > > > > PartitionFetchInfo(959825,1048576),[some-topic,3] ->
> > > > > > > PartitionFetchInfo(551546,1048576)
> > > > > > > java.net.SocketException: Too many open files
> > > > > > >         at sun.nio.ch.Net.socket0(Native Method)
> > > > > > >         at sun.nio.ch.Net.socket(Net.java:156)
> > > > > > >         at
> > > > > > sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > > > > > >         at
> > > > java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > > > > >         at
> > > > > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > > > > > >         at
> > > > > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > > > > >         at
> > > > > > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > > > > >         at
> > > > > > >
> > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > > >         at
> > > > > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > > > > >         at
> > > > > > >
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > > > > >         at
> > > > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Ahmy Yulrizka
> > > > > > > http://ahmy.yulrizka.com
> > > > > > > @yulrizka
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Possibly leaking socket on ReplicaFetcherThread

Posted by Priya Matpadi <pr...@ecofactor.com>.
Further more, the problem is not just restricted to ReplicaFetcherThread.
Kafka consumer server also leaks sockets due to SendThread using same code
. See below stack trace:

2014-01-23 06:48:09,699 INFO  [org.apache.zookeeper.ClientCnxn]
(OurKafkaMessageFetcher-blah1-SendThread(pkafka3.our.com:2181)) Opening
socket connection to server pkafka2.our.com/10.58.0.191:2181
2014-01-23 06:48:10,124 INFO  [org.apache.zookeeper.ClientCnxn]
(OurKafkaMessageFetcher-blah2-SendThread(pkafka1.our.com:2181)) Opening
socket connection to server pkafka2.our.com/10.58.0.191:2181
2014-01-23 06:48:10,272 INFO  [org.apache.zookeeper.ClientCnxn]
(OurKafkaMessageFetcher-blah3-SendThread(pkafka2.our.com:2181)) Opening
socket connection to server 10.58.0.190/10.58.0.190:2181
2014-01-23 06:48:11,033 INFO  [org.apache.zookeeper.ClientCnxn]
(OurKafkaMessageFetcher-blah1-SendThread(pkafka2.our.com:2181)) Client
session timed out, have not heard from server in 3257ms for sessionid
0x3436ae2b16a0071, closing socket connection and attempting reconnect
2014-01-23 06:48:11,070 WARN  [kafka.consumer.ConsumerFetcherThread]
(ConsumerFetcherThread-blah1KafkaGroup_pjb1.our.com-1390441665650-635d2718-0-2)
[ConsumerFetcherThread-blah1KafkaGroup_pjb1.our.com-1390441665650-635d2718-0-2],
Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 176434;
ClientId:
blah1KafkaGroup-ConsumerFetcherThread-blah1KafkaGroup_pjb1.our.com-1390441665650-635d2718-0-2;
ReplicaId: -1; MaxWait: 100
 ms; MinBytes: 1 bytes; RequestInfo: [blah,0] ->
PartitionFetchInfo(0,1048576),[blah,4] -> PartitionFetchInfo(0,1048576)
java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:30)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
        at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)

Re: Possibly leaking socket on ReplicaFetcherThread

Posted by Priya Matpadi <pr...@ecofactor.com>.
Hello,
Is there any progress on this issue? We also experience socket leak in case
of network outage.
Thanks,
Priya


On Fri, Jan 24, 2014 at 7:30 AM, Jun Rao <ju...@gmail.com> wrote:

> Thanks for find this out. We probably should disconnect on any exception.
> Could you file a jira and perhaps attach a patch?
>
> Thanks,
>
> Jun
>
>
> On Fri, Jan 24, 2014 at 6:06 AM, Ahmy Yulrizka <ah...@yulrizka.com> wrote:
>
> > Hi,
> >
> > I Think I found out the problem..
> >
> > this is part of the stack trace. First i think there is connection
> problem,
> > and when connection restore it get new information from the zookeeper
> >
> > [2014-01-23 23:24:55,391] INFO Opening socket connection to server
> > host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:55,391] INFO Socket connection established to
> > host2.provider.com/2.2.2.2:2181, initiating session
> > (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:55,391] DEBUG Session establishment request sent on
> > host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:57,393] INFO Client session timed out, have not heard
> > from server in 2002ms for sessionid 0x0, closing socket connection and
> > attempting reconnect (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:58,047] INFO Opening socket connection to server
> > host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:59,048] INFO Socket connection established to
> > host3.provider.com/3.3.3.3:2181, initiating session
> > (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:59,049] DEBUG Session establishment request sent on
> > host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:59,539] INFO Session establishment complete on server
> > host3.provider.com/3.3.3.3:2181, sessionid = 0x343c13436e50001,
> negotiated
> > timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:59,539] DEBUG Received event: WatchedEvent
> > state:SyncConnected type:None path:null (org.I0Itec.zkclient.ZkClient)
> > [2014-01-23 23:24:59,539] INFO zookeeper state changed (SyncConnected)
> > (org.I0Itec.zkclient.ZkClient)
> > [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> > SyncConnected sent to
> > kafka.server.KafkaZooKeeper$SessionExpireListener@163e1f27]
> > (org.I0Itec.zkclient.ZkEventThread)
> > [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> > SyncConnected sent to
> > kafka.controller.KafkaController$SessionExpirationListener@486f44d9]
> > (org.I0Itec.zkclient.ZkEventThread)
> > [2014-01-23 23:24:59,539] DEBUG Leaving process event
> > (org.I0Itec.zkclient.ZkClient)
> > [2014-01-23 23:24:59,539] DEBUG State is SyncConnected
> > (org.I0Itec.zkclient.ZkClient)
> > [2014-01-23 23:24:59,540] DEBUG State is SyncConnected
> > (org.I0Itec.zkclient.ZkClient)
> >
> > Then the ReplicaFetcher Thread tries to reconnect.
> > At this point it tries to connect to other brokers. but then it can't
> > resolve the IP address and throws :
> > "java.nio.channels.UnresolvedAddressException"
> >
> > [2014-01-23 23:25:01,067] WARN [ReplicaFetcherThread-0-1], Error in fetch
> > Name: FetchRequest; Version: 0; CorrelationId: 478411; ClientId:
> > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> bytes;
> > RequestInfo: [some-topic,0] ->
> > PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> > PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
> > java.nio.channels.UnresolvedAddressException
> > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> >  at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [2014-01-23 23:25:01,077] INFO Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> > [2014-01-23 23:25:01,078] WARN [ReplicaFetcherThread-0-1], Error in fetch
> > Name: FetchRequest; Version: 0; CorrelationId: 478412; ClientId:
> > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> bytes;
> > RequestInfo: [some-topic,0] ->
> > PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> > PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
> > java.nio.channels.UnresolvedAddressException
> > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> >  at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [2014-01-23 23:25:01,079] INFO Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> >
> >
> > it throws bunch of this error until too many open file...
> >
> > [2014-01-23 23:25:03,756] INFO Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> > [2014-01-23 23:25:03,756] WARN [ReplicaFetcherThread-0-1], Error in fetch
> > Name: FetchRequest; Version: 0; CorrelationId: 482406; ClientId:
> > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> bytes;
> > RequestInfo: [some-topic,0] ->
> > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> > java.nio.channels.UnresolvedAddressException
> > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> >  at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> > [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in fetch
> > Name: FetchRequest; Version: 0; CorrelationId: 482407; ClientId:
> > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> bytes;
> > RequestInfo: [some-topic,0] ->
> > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> > java.net.SocketException: Too many open files
> > at sun.nio.ch.Net.socket0(Native Method)
> > at sun.nio.ch.Net.socket(Net.java:156)
> >  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > at
> >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> >  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> >  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> >  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> >  at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> > [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in fetch
> > Name: FetchRequest; Version: 0; CorrelationId: 482408; ClientId:
> > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> bytes;
> > RequestInfo: [some-topic,0] ->
> > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> > java.net.SocketException: Too many open files
> > at sun.nio.ch.Net.socket0(Native Method)
> > at sun.nio.ch.Net.socket(Net.java:156)
> >  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > at
> >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> >  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> >  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> >  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> >  at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [2014-01-23 23:25:03,758] INFO Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> >
> >
> >
> > I look into the source code of
> > core/src/main/scala/kafka/consumer/SimpleConsumer.scala line 79,
> >
> > it tries to reconnect(). but only close the connection when it is
> > "java.io.IOException" which did not catch
> > "java.nio.channels.UnresolvedAddressException"
> >
> >   private def sendRequest(request: RequestOrResponse): Receive = {
> >     lock synchronized {
> >       getOrMakeConnection()
> >       var response: Receive = null
> >       try {
> >         blockingChannel.send(request)
> >         response = blockingChannel.receive()
> >       } catch {
> >         case e : java.io.IOException =>
> >           info("Reconnect due to socket error: %s".format(e.getMessage))
> >           // retry once
> >           try {
> >             reconnect()
> >             blockingChannel.send(request)
> >             response = blockingChannel.receive()
> >           } catch {
> >             case ioe: java.io.IOException =>
> >               disconnect()
> >               throw ioe
> >           }
> >         case e: Throwable => throw e
> >       }
> >       response
> >     }
> >   }
> >
> > This is my production setting
> >
> > OS: Ubuntu 12.04
> > kafka : kafka_2.8.0-0.8.0.jar
> >
> > java :
> > java version "1.6.0_27"
> > OpenJDK Runtime Environment (IcedTea6 1.12.6)
> > (6b27-1.12.6-1ubuntu0.12.04.4)
> > OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)
> >
> > Some interesting fact that if i one of the file descriptor of the process
> > using
> >
> > $ gdb -p KAFKA_PID
> > > call close(4567)
> >
> > it reuse that file description and establish connection to a consumer
> >
> > java       9708      kafka 4087u     sock                0,7         0t0
> > 3258461771 can't identify protocol
> > java       9708      kafka 4088u     IPv4         3441430493         0t0
> >      TCP host2.provider.com:9092->consumer.host.com:38208 (ESTABLISHED)
> > java       9708      kafka 4089u     sock                0,7         0t0
> > 3258461773 can't identify protocol
> > java       9708      kafka 4090u     sock                0,7         0t0
> > 3258461774 can't identify protocol
> >
> >
> >
> >
> > --
> > Ahmy Yulrizka
> > http://ahmy.yulrizka.com
> > @yulrizka
> >
> >
> > On Wed, Jan 22, 2014 at 5:41 AM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Hmm, without knowing the client ip, it's hard to tell whether those are
> > > from replication fetcher threads or not. Are most of those connections
> in
> > > established mode?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, Jan 21, 2014 at 8:06 AM, Ahmy Yulrizka <ah...@yulrizka.com>
> > wrote:
> > >
> > > > this is the the line i copied on lsof
> > > >
> > > > ...
> > > > java      11818      kafka   98u     sock                0,7
> 0t0
> > > >  615628183 can't identify protocol
> > > > java      11818      kafka   99u     IPv4          615077352
> 0t0
> > > >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37547
> > > > (CLOSE_WAIT)
> > > > java      11818      kafka  100u     IPv4          615077353
> 0t0
> > > >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37553
> > > > (ESTABLISHED)
> > > > java      11818      kafka  101u     sock                0,7
> 0t0
> > > >  615628184 can't identify protocol
> > > > java      11818      kafka  102u     sock                0,7
> 0t0
> > > >  615628185 can't identify protocol
> > > > java      11818      kafka  103u     sock                0,7
> 0t0
> > > >  615628186 can't identify protocol
> > > > ...
> > > >
> > > > as you can see, from the output, i could see the connection state on
> > some
> > > > of the TCP, but the sock only gives information "can't identify
> > protocol"
> > > > so I could not see where or from this sock is originating
> > > >
> > > > I could not see the connection also when i run netstat -nat
> > > >
> > > > --
> > > > Ahmy Yulrizka
> > > > http://ahmy.yulrizka.com
> > > > @yulrizka
> > > >
> > > >
> > > > On Tue, Jan 21, 2014 at 4:42 PM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > What mode are those sockets in (established, closed, etc)? Also,
> from
> > > the
> > > > > ip, could you tell whether those sockets are from the client or
> from
> > > the
> > > > > replica fetcher in the brokers.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Tue, Jan 21, 2014 at 3:29 AM, Ahmy Yulrizka <ah...@yulrizka.com>
> > > > wrote:
> > > > >
> > > > > > We are running 3 kafka nodes, which servers 4 partition.
> > > > > > We have been experiencing weird behavior during network outage.
> > > > > >
> > > > > > we had been experiencing twice in the last couple of days. the
> > > previous
> > > > > one
> > > > > > took down all of the cluster.
> > > > > > while this one only 2 out of 3 survive. and 1 node became the
> > leader
> > > of
> > > > > all
> > > > > > partition, and other node only in ISR of 1 partition (out of 4)
> > > > > >
> > > > > > my best guess now is that when the network down, the broker can't
> > > > connect
> > > > > > to other broker to do replication and keep opening the socket
> > > > > > without closing it. But I'm not entirely sure about this.
> > > > > >
> > > > > > Is there any way to mitigate the problem ? or is there any
> > > > configuration
> > > > > > options to stop this from happening again ?
> > > > > >
> > > > > >
> > > > > > The java/kafka process open too many socket file descriptor.
> > > > > > running `lsof -a -p 11818` yield thousand of this line.
> > > > > >
> > > > > > ...
> > > > > > java    11818 kafka 3059u  sock                0,7       0t0
> > > 615637305
> > > > > > can't identify protocol
> > > > > > java    11818 kafka 3060u  sock                0,7       0t0
> > > 615637306
> > > > > > can't identify protocol
> > > > > > java    11818 kafka 3061u  sock                0,7       0t0
> > > 615637307
> > > > > > can't identify protocol
> > > > > > java    11818 kafka 3062u  sock                0,7       0t0
> > > 615637308
> > > > > > can't identify protocol
> > > > > > java    11818 kafka 3063u  sock                0,7       0t0
> > > 615637309
> > > > > > can't identify protocol
> > > > > > java    11818 kafka 3064u  sock                0,7       0t0
> > > 615637310
> > > > > > can't identify protocol
> > > > > > java    11818 kafka 3065u  sock                0,7       0t0
> > > 615637311
> > > > > > can't identify protocol
> > > > > > ...
> > > > > >
> > > > > > i verify that the the open socket did not close when i repeated
> the
> > > > > command
> > > > > > after 2 minutes.
> > > > > >
> > > > > >
> > > > > > and the kafka log on the broken node, generate lots of error like
> > > this:
> > > > > >
> > > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > > java.io.IOException: Too many open files
> > > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> > Method)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > > >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > > java.io.IOException: Too many open files
> > > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> > Method)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > > >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > > [2014-01-21 04:21:48,811]  64573917 [ReplicaFetcherThread-0-1]
> INFO
> > > > > >  kafka.consumer.SimpleConsumer  - Reconnect due to socket error:
> > null
> > > > > > [2014-01-21 04:21:48,819]  64573925 [ReplicaFetcherThread-0-1]
> WARN
> > > > > >  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-0-1],
> > > Error
> > > > > in
> > > > > > fetch Name: FetchRequest; Version: 0; CorrelationId: 74930218;
> > > > ClientId:
> > > > > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms;
> MinBytes:
> > 1
> > > > > bytes;
> > > > > > RequestInfo: [some-topic,0] ->
> > > > > > PartitionFetchInfo(959825,1048576),[some-topic,3] ->
> > > > > > PartitionFetchInfo(551546,1048576)
> > > > > > java.net.SocketException: Too many open files
> > > > > >         at sun.nio.ch.Net.socket0(Native Method)
> > > > > >         at sun.nio.ch.Net.socket(Net.java:156)
> > > > > >         at
> > > > > sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > > > > >         at
> > > java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > > > >         at
> > > > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > > > > >         at
> > > > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > > > >         at
> > > > > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > > > >         at
> > > > > >
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > >         at
> > > > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > > > >         at
> > > > > >
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > > > >         at
> > > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Ahmy Yulrizka
> > > > > > http://ahmy.yulrizka.com
> > > > > > @yulrizka
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Possibly leaking socket on ReplicaFetcherThread

Posted by Jun Rao <ju...@gmail.com>.
Thanks for find this out. We probably should disconnect on any exception.
Could you file a jira and perhaps attach a patch?

Thanks,

Jun


On Fri, Jan 24, 2014 at 6:06 AM, Ahmy Yulrizka <ah...@yulrizka.com> wrote:

> Hi,
>
> I Think I found out the problem..
>
> this is part of the stack trace. First i think there is connection problem,
> and when connection restore it get new information from the zookeeper
>
> [2014-01-23 23:24:55,391] INFO Opening socket connection to server
> host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:55,391] INFO Socket connection established to
> host2.provider.com/2.2.2.2:2181, initiating session
> (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:55,391] DEBUG Session establishment request sent on
> host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:57,393] INFO Client session timed out, have not heard
> from server in 2002ms for sessionid 0x0, closing socket connection and
> attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:58,047] INFO Opening socket connection to server
> host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:59,048] INFO Socket connection established to
> host3.provider.com/3.3.3.3:2181, initiating session
> (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:59,049] DEBUG Session establishment request sent on
> host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:59,539] INFO Session establishment complete on server
> host3.provider.com/3.3.3.3:2181, sessionid = 0x343c13436e50001, negotiated
> timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:59,539] DEBUG Received event: WatchedEvent
> state:SyncConnected type:None path:null (org.I0Itec.zkclient.ZkClient)
> [2014-01-23 23:24:59,539] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> SyncConnected sent to
> kafka.server.KafkaZooKeeper$SessionExpireListener@163e1f27]
> (org.I0Itec.zkclient.ZkEventThread)
> [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> SyncConnected sent to
> kafka.controller.KafkaController$SessionExpirationListener@486f44d9]
> (org.I0Itec.zkclient.ZkEventThread)
> [2014-01-23 23:24:59,539] DEBUG Leaving process event
> (org.I0Itec.zkclient.ZkClient)
> [2014-01-23 23:24:59,539] DEBUG State is SyncConnected
> (org.I0Itec.zkclient.ZkClient)
> [2014-01-23 23:24:59,540] DEBUG State is SyncConnected
> (org.I0Itec.zkclient.ZkClient)
>
> Then the ReplicaFetcher Thread tries to reconnect.
> At this point it tries to connect to other brokers. but then it can't
> resolve the IP address and throws :
> "java.nio.channels.UnresolvedAddressException"
>
> [2014-01-23 23:25:01,067] WARN [ReplicaFetcherThread-0-1], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 478411; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [some-topic,0] ->
> PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:89)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
>  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>  at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>  at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-01-23 23:25:01,077] INFO Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-01-23 23:25:01,078] WARN [ReplicaFetcherThread-0-1], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 478412; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [some-topic,0] ->
> PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:89)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
>  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>  at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>  at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-01-23 23:25:01,079] INFO Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
>
>
> it throws bunch of this error until too many open file...
>
> [2014-01-23 23:25:03,756] INFO Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-01-23 23:25:03,756] WARN [ReplicaFetcherThread-0-1], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 482406; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [some-topic,0] ->
> PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:89)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
>  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>  at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>  at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 482407; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [some-topic,0] ->
> PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> java.net.SocketException: Too many open files
> at sun.nio.ch.Net.socket0(Native Method)
> at sun.nio.ch.Net.socket(Net.java:156)
>  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> at
>
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
>  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
>  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
>  at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 482408; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [some-topic,0] ->
> PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> java.net.SocketException: Too many open files
> at sun.nio.ch.Net.socket0(Native Method)
> at sun.nio.ch.Net.socket(Net.java:156)
>  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> at
>
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
>  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
>  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
>  at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-01-23 23:25:03,758] INFO Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
>
>
>
> I look into the source code of
> core/src/main/scala/kafka/consumer/SimpleConsumer.scala line 79,
>
> it tries to reconnect(). but only close the connection when it is
> "java.io.IOException" which did not catch
> "java.nio.channels.UnresolvedAddressException"
>
>   private def sendRequest(request: RequestOrResponse): Receive = {
>     lock synchronized {
>       getOrMakeConnection()
>       var response: Receive = null
>       try {
>         blockingChannel.send(request)
>         response = blockingChannel.receive()
>       } catch {
>         case e : java.io.IOException =>
>           info("Reconnect due to socket error: %s".format(e.getMessage))
>           // retry once
>           try {
>             reconnect()
>             blockingChannel.send(request)
>             response = blockingChannel.receive()
>           } catch {
>             case ioe: java.io.IOException =>
>               disconnect()
>               throw ioe
>           }
>         case e: Throwable => throw e
>       }
>       response
>     }
>   }
>
> This is my production setting
>
> OS: Ubuntu 12.04
> kafka : kafka_2.8.0-0.8.0.jar
>
> java :
> java version "1.6.0_27"
> OpenJDK Runtime Environment (IcedTea6 1.12.6)
> (6b27-1.12.6-1ubuntu0.12.04.4)
> OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)
>
> Some interesting fact that if i one of the file descriptor of the process
> using
>
> $ gdb -p KAFKA_PID
> > call close(4567)
>
> it reuse that file description and establish connection to a consumer
>
> java       9708      kafka 4087u     sock                0,7         0t0
> 3258461771 can't identify protocol
> java       9708      kafka 4088u     IPv4         3441430493         0t0
>      TCP host2.provider.com:9092->consumer.host.com:38208 (ESTABLISHED)
> java       9708      kafka 4089u     sock                0,7         0t0
> 3258461773 can't identify protocol
> java       9708      kafka 4090u     sock                0,7         0t0
> 3258461774 can't identify protocol
>
>
>
>
> --
> Ahmy Yulrizka
> http://ahmy.yulrizka.com
> @yulrizka
>
>
> On Wed, Jan 22, 2014 at 5:41 AM, Jun Rao <ju...@gmail.com> wrote:
>
> > Hmm, without knowing the client ip, it's hard to tell whether those are
> > from replication fetcher threads or not. Are most of those connections in
> > established mode?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Jan 21, 2014 at 8:06 AM, Ahmy Yulrizka <ah...@yulrizka.com>
> wrote:
> >
> > > this is the the line i copied on lsof
> > >
> > > ...
> > > java      11818      kafka   98u     sock                0,7       0t0
> > >  615628183 can't identify protocol
> > > java      11818      kafka   99u     IPv4          615077352       0t0
> > >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37547
> > > (CLOSE_WAIT)
> > > java      11818      kafka  100u     IPv4          615077353       0t0
> > >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37553
> > > (ESTABLISHED)
> > > java      11818      kafka  101u     sock                0,7       0t0
> > >  615628184 can't identify protocol
> > > java      11818      kafka  102u     sock                0,7       0t0
> > >  615628185 can't identify protocol
> > > java      11818      kafka  103u     sock                0,7       0t0
> > >  615628186 can't identify protocol
> > > ...
> > >
> > > as you can see, from the output, i could see the connection state on
> some
> > > of the TCP, but the sock only gives information "can't identify
> protocol"
> > > so I could not see where or from this sock is originating
> > >
> > > I could not see the connection also when i run netstat -nat
> > >
> > > --
> > > Ahmy Yulrizka
> > > http://ahmy.yulrizka.com
> > > @yulrizka
> > >
> > >
> > > On Tue, Jan 21, 2014 at 4:42 PM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > What mode are those sockets in (established, closed, etc)? Also, from
> > the
> > > > ip, could you tell whether those sockets are from the client or from
> > the
> > > > replica fetcher in the brokers.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Tue, Jan 21, 2014 at 3:29 AM, Ahmy Yulrizka <ah...@yulrizka.com>
> > > wrote:
> > > >
> > > > > We are running 3 kafka nodes, which servers 4 partition.
> > > > > We have been experiencing weird behavior during network outage.
> > > > >
> > > > > we had been experiencing twice in the last couple of days. the
> > previous
> > > > one
> > > > > took down all of the cluster.
> > > > > while this one only 2 out of 3 survive. and 1 node became the
> leader
> > of
> > > > all
> > > > > partition, and other node only in ISR of 1 partition (out of 4)
> > > > >
> > > > > my best guess now is that when the network down, the broker can't
> > > connect
> > > > > to other broker to do replication and keep opening the socket
> > > > > without closing it. But I'm not entirely sure about this.
> > > > >
> > > > > Is there any way to mitigate the problem ? or is there any
> > > configuration
> > > > > options to stop this from happening again ?
> > > > >
> > > > >
> > > > > The java/kafka process open too many socket file descriptor.
> > > > > running `lsof -a -p 11818` yield thousand of this line.
> > > > >
> > > > > ...
> > > > > java    11818 kafka 3059u  sock                0,7       0t0
> > 615637305
> > > > > can't identify protocol
> > > > > java    11818 kafka 3060u  sock                0,7       0t0
> > 615637306
> > > > > can't identify protocol
> > > > > java    11818 kafka 3061u  sock                0,7       0t0
> > 615637307
> > > > > can't identify protocol
> > > > > java    11818 kafka 3062u  sock                0,7       0t0
> > 615637308
> > > > > can't identify protocol
> > > > > java    11818 kafka 3063u  sock                0,7       0t0
> > 615637309
> > > > > can't identify protocol
> > > > > java    11818 kafka 3064u  sock                0,7       0t0
> > 615637310
> > > > > can't identify protocol
> > > > > java    11818 kafka 3065u  sock                0,7       0t0
> > 615637311
> > > > > can't identify protocol
> > > > > ...
> > > > >
> > > > > i verify that the the open socket did not close when i repeated the
> > > > command
> > > > > after 2 minutes.
> > > > >
> > > > >
> > > > > and the kafka log on the broken node, generate lots of error like
> > this:
> > > > >
> > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > java.io.IOException: Too many open files
> > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> Method)
> > > > >         at
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > java.io.IOException: Too many open files
> > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> Method)
> > > > >         at
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > [2014-01-21 04:21:48,811]  64573917 [ReplicaFetcherThread-0-1] INFO
> > > > >  kafka.consumer.SimpleConsumer  - Reconnect due to socket error:
> null
> > > > > [2014-01-21 04:21:48,819]  64573925 [ReplicaFetcherThread-0-1] WARN
> > > > >  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-0-1],
> > Error
> > > > in
> > > > > fetch Name: FetchRequest; Version: 0; CorrelationId: 74930218;
> > > ClientId:
> > > > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes:
> 1
> > > > bytes;
> > > > > RequestInfo: [some-topic,0] ->
> > > > > PartitionFetchInfo(959825,1048576),[some-topic,3] ->
> > > > > PartitionFetchInfo(551546,1048576)
> > > > > java.net.SocketException: Too many open files
> > > > >         at sun.nio.ch.Net.socket0(Native Method)
> > > > >         at sun.nio.ch.Net.socket(Net.java:156)
> > > > >         at
> > > > sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > > > >         at
> > java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > > >         at
> > > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > > > >         at
> > > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > > >         at
> > > > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > > >         at
> > > > >
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > >         at
> > > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > > >         at
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > > >         at
> > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > >
> > > > >
> > > > > --
> > > > > Ahmy Yulrizka
> > > > > http://ahmy.yulrizka.com
> > > > > @yulrizka
> > > > >
> > > >
> > >
> >
>

Re: Possibly leaking socket on ReplicaFetcherThread

Posted by Ahmy Yulrizka <ah...@yulrizka.com>.
Hi,

I Think I found out the problem..

this is part of the stack trace. First i think there is connection problem,
and when connection restore it get new information from the zookeeper

[2014-01-23 23:24:55,391] INFO Opening socket connection to server
host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
[2014-01-23 23:24:55,391] INFO Socket connection established to
host2.provider.com/2.2.2.2:2181, initiating session
(org.apache.zookeeper.ClientCnxn)
[2014-01-23 23:24:55,391] DEBUG Session establishment request sent on
host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
[2014-01-23 23:24:57,393] INFO Client session timed out, have not heard
from server in 2002ms for sessionid 0x0, closing socket connection and
attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2014-01-23 23:24:58,047] INFO Opening socket connection to server
host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
[2014-01-23 23:24:59,048] INFO Socket connection established to
host3.provider.com/3.3.3.3:2181, initiating session
(org.apache.zookeeper.ClientCnxn)
[2014-01-23 23:24:59,049] DEBUG Session establishment request sent on
host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
[2014-01-23 23:24:59,539] INFO Session establishment complete on server
host3.provider.com/3.3.3.3:2181, sessionid = 0x343c13436e50001, negotiated
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2014-01-23 23:24:59,539] DEBUG Received event: WatchedEvent
state:SyncConnected type:None path:null (org.I0Itec.zkclient.ZkClient)
[2014-01-23 23:24:59,539] INFO zookeeper state changed (SyncConnected)
(org.I0Itec.zkclient.ZkClient)
[2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
SyncConnected sent to
kafka.server.KafkaZooKeeper$SessionExpireListener@163e1f27]
(org.I0Itec.zkclient.ZkEventThread)
[2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
SyncConnected sent to
kafka.controller.KafkaController$SessionExpirationListener@486f44d9]
(org.I0Itec.zkclient.ZkEventThread)
[2014-01-23 23:24:59,539] DEBUG Leaving process event
(org.I0Itec.zkclient.ZkClient)
[2014-01-23 23:24:59,539] DEBUG State is SyncConnected
(org.I0Itec.zkclient.ZkClient)
[2014-01-23 23:24:59,540] DEBUG State is SyncConnected
(org.I0Itec.zkclient.ZkClient)

Then the ReplicaFetcher Thread tries to reconnect.
At this point it tries to connect to other brokers. but then it can't
resolve the IP address and throws :
"java.nio.channels.UnresolvedAddressException"

[2014-01-23 23:25:01,067] WARN [ReplicaFetcherThread-0-1], Error in fetch
Name: FetchRequest; Version: 0; CorrelationId: 478411; ClientId:
ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
RequestInfo: [some-topic,0] ->
PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:89)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
 at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
 at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
 at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
 at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-01-23 23:25:01,077] INFO Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)
[2014-01-23 23:25:01,078] WARN [ReplicaFetcherThread-0-1], Error in fetch
Name: FetchRequest; Version: 0; CorrelationId: 478412; ClientId:
ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
RequestInfo: [some-topic,0] ->
PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:89)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
 at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
 at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
 at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
 at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-01-23 23:25:01,079] INFO Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)


it throws bunch of this error until too many open file...

[2014-01-23 23:25:03,756] INFO Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)
[2014-01-23 23:25:03,756] WARN [ReplicaFetcherThread-0-1], Error in fetch
Name: FetchRequest; Version: 0; CorrelationId: 482406; ClientId:
ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
RequestInfo: [some-topic,0] ->
PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
PartitionFetchInfo(839677,1048576),[some-topic,1] ->
PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:89)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
 at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
 at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
 at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
 at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)
[2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in fetch
Name: FetchRequest; Version: 0; CorrelationId: 482407; ClientId:
ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
RequestInfo: [some-topic,0] ->
PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
PartitionFetchInfo(839677,1048576),[some-topic,1] ->
PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
java.net.SocketException: Too many open files
at sun.nio.ch.Net.socket0(Native Method)
at sun.nio.ch.Net.socket(Net.java:156)
 at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
at
sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
 at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
 at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
 at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
 at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)
[2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in fetch
Name: FetchRequest; Version: 0; CorrelationId: 482408; ClientId:
ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
RequestInfo: [some-topic,0] ->
PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
PartitionFetchInfo(839677,1048576),[some-topic,1] ->
PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
java.net.SocketException: Too many open files
at sun.nio.ch.Net.socket0(Native Method)
at sun.nio.ch.Net.socket(Net.java:156)
 at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
at
sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
 at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
 at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
 at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
 at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
 at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-01-23 23:25:03,758] INFO Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)



I look into the source code of
core/src/main/scala/kafka/consumer/SimpleConsumer.scala line 79,

it tries to reconnect(). but only close the connection when it is
"java.io.IOException" which did not catch
"java.nio.channels.UnresolvedAddressException"

  private def sendRequest(request: RequestOrResponse): Receive = {
    lock synchronized {
      getOrMakeConnection()
      var response: Receive = null
      try {
        blockingChannel.send(request)
        response = blockingChannel.receive()
      } catch {
        case e : java.io.IOException =>
          info("Reconnect due to socket error: %s".format(e.getMessage))
          // retry once
          try {
            reconnect()
            blockingChannel.send(request)
            response = blockingChannel.receive()
          } catch {
            case ioe: java.io.IOException =>
              disconnect()
              throw ioe
          }
        case e: Throwable => throw e
      }
      response
    }
  }

This is my production setting

OS: Ubuntu 12.04
kafka : kafka_2.8.0-0.8.0.jar

java :
java version "1.6.0_27"
OpenJDK Runtime Environment (IcedTea6 1.12.6) (6b27-1.12.6-1ubuntu0.12.04.4)
OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)

Some interesting fact that if i one of the file descriptor of the process
using

$ gdb -p KAFKA_PID
> call close(4567)

it reuse that file description and establish connection to a consumer

java       9708      kafka 4087u     sock                0,7         0t0
3258461771 can't identify protocol
java       9708      kafka 4088u     IPv4         3441430493         0t0
     TCP host2.provider.com:9092->consumer.host.com:38208 (ESTABLISHED)
java       9708      kafka 4089u     sock                0,7         0t0
3258461773 can't identify protocol
java       9708      kafka 4090u     sock                0,7         0t0
3258461774 can't identify protocol




--
Ahmy Yulrizka
http://ahmy.yulrizka.com
@yulrizka


On Wed, Jan 22, 2014 at 5:41 AM, Jun Rao <ju...@gmail.com> wrote:

> Hmm, without knowing the client ip, it's hard to tell whether those are
> from replication fetcher threads or not. Are most of those connections in
> established mode?
>
> Thanks,
>
> Jun
>
>
> On Tue, Jan 21, 2014 at 8:06 AM, Ahmy Yulrizka <ah...@yulrizka.com> wrote:
>
> > this is the the line i copied on lsof
> >
> > ...
> > java      11818      kafka   98u     sock                0,7       0t0
> >  615628183 can't identify protocol
> > java      11818      kafka   99u     IPv4          615077352       0t0
> >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37547
> > (CLOSE_WAIT)
> > java      11818      kafka  100u     IPv4          615077353       0t0
> >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37553
> > (ESTABLISHED)
> > java      11818      kafka  101u     sock                0,7       0t0
> >  615628184 can't identify protocol
> > java      11818      kafka  102u     sock                0,7       0t0
> >  615628185 can't identify protocol
> > java      11818      kafka  103u     sock                0,7       0t0
> >  615628186 can't identify protocol
> > ...
> >
> > as you can see, from the output, i could see the connection state on some
> > of the TCP, but the sock only gives information "can't identify protocol"
> > so I could not see where or from this sock is originating
> >
> > I could not see the connection also when i run netstat -nat
> >
> > --
> > Ahmy Yulrizka
> > http://ahmy.yulrizka.com
> > @yulrizka
> >
> >
> > On Tue, Jan 21, 2014 at 4:42 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > What mode are those sockets in (established, closed, etc)? Also, from
> the
> > > ip, could you tell whether those sockets are from the client or from
> the
> > > replica fetcher in the brokers.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, Jan 21, 2014 at 3:29 AM, Ahmy Yulrizka <ah...@yulrizka.com>
> > wrote:
> > >
> > > > We are running 3 kafka nodes, which servers 4 partition.
> > > > We have been experiencing weird behavior during network outage.
> > > >
> > > > we had been experiencing twice in the last couple of days. the
> previous
> > > one
> > > > took down all of the cluster.
> > > > while this one only 2 out of 3 survive. and 1 node became the leader
> of
> > > all
> > > > partition, and other node only in ISR of 1 partition (out of 4)
> > > >
> > > > my best guess now is that when the network down, the broker can't
> > connect
> > > > to other broker to do replication and keep opening the socket
> > > > without closing it. But I'm not entirely sure about this.
> > > >
> > > > Is there any way to mitigate the problem ? or is there any
> > configuration
> > > > options to stop this from happening again ?
> > > >
> > > >
> > > > The java/kafka process open too many socket file descriptor.
> > > > running `lsof -a -p 11818` yield thousand of this line.
> > > >
> > > > ...
> > > > java    11818 kafka 3059u  sock                0,7       0t0
> 615637305
> > > > can't identify protocol
> > > > java    11818 kafka 3060u  sock                0,7       0t0
> 615637306
> > > > can't identify protocol
> > > > java    11818 kafka 3061u  sock                0,7       0t0
> 615637307
> > > > can't identify protocol
> > > > java    11818 kafka 3062u  sock                0,7       0t0
> 615637308
> > > > can't identify protocol
> > > > java    11818 kafka 3063u  sock                0,7       0t0
> 615637309
> > > > can't identify protocol
> > > > java    11818 kafka 3064u  sock                0,7       0t0
> 615637310
> > > > can't identify protocol
> > > > java    11818 kafka 3065u  sock                0,7       0t0
> 615637311
> > > > can't identify protocol
> > > > ...
> > > >
> > > > i verify that the the open socket did not close when i repeated the
> > > command
> > > > after 2 minutes.
> > > >
> > > >
> > > > and the kafka log on the broken node, generate lots of error like
> this:
> > > >
> > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > kafka.network.Acceptor  - Error in acceptor
> > > > java.io.IOException: Too many open files
> > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> > > >         at
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > >         at java.lang.Thread.run(Thread.java:701)
> > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > kafka.network.Acceptor  - Error in acceptor
> > > > java.io.IOException: Too many open files
> > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> > > >         at
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > >         at java.lang.Thread.run(Thread.java:701)
> > > > [2014-01-21 04:21:48,811]  64573917 [ReplicaFetcherThread-0-1] INFO
> > > >  kafka.consumer.SimpleConsumer  - Reconnect due to socket error: null
> > > > [2014-01-21 04:21:48,819]  64573925 [ReplicaFetcherThread-0-1] WARN
> > > >  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-0-1],
> Error
> > > in
> > > > fetch Name: FetchRequest; Version: 0; CorrelationId: 74930218;
> > ClientId:
> > > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > > bytes;
> > > > RequestInfo: [some-topic,0] ->
> > > > PartitionFetchInfo(959825,1048576),[some-topic,3] ->
> > > > PartitionFetchInfo(551546,1048576)
> > > > java.net.SocketException: Too many open files
> > > >         at sun.nio.ch.Net.socket0(Native Method)
> > > >         at sun.nio.ch.Net.socket(Net.java:156)
> > > >         at
> > > sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > >         at
> > > >
> > > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > > >         at
> java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > >         at
> > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > > >         at
> > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > >         at
> > > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > >         at
> > > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > >         at
> > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > >         at
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > >         at
> > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > >
> > > >
> > > > --
> > > > Ahmy Yulrizka
> > > > http://ahmy.yulrizka.com
> > > > @yulrizka
> > > >
> > >
> >
>

Re: Possibly leaking socket on ReplicaFetcherThread

Posted by Jun Rao <ju...@gmail.com>.
Hmm, without knowing the client ip, it's hard to tell whether those are
from replication fetcher threads or not. Are most of those connections in
established mode?

Thanks,

Jun


On Tue, Jan 21, 2014 at 8:06 AM, Ahmy Yulrizka <ah...@yulrizka.com> wrote:

> this is the the line i copied on lsof
>
> ...
> java      11818      kafka   98u     sock                0,7       0t0
>  615628183 can't identify protocol
> java      11818      kafka   99u     IPv4          615077352       0t0
>    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37547
> (CLOSE_WAIT)
> java      11818      kafka  100u     IPv4          615077353       0t0
>    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37553
> (ESTABLISHED)
> java      11818      kafka  101u     sock                0,7       0t0
>  615628184 can't identify protocol
> java      11818      kafka  102u     sock                0,7       0t0
>  615628185 can't identify protocol
> java      11818      kafka  103u     sock                0,7       0t0
>  615628186 can't identify protocol
> ...
>
> as you can see, from the output, i could see the connection state on some
> of the TCP, but the sock only gives information "can't identify protocol"
> so I could not see where or from this sock is originating
>
> I could not see the connection also when i run netstat -nat
>
> --
> Ahmy Yulrizka
> http://ahmy.yulrizka.com
> @yulrizka
>
>
> On Tue, Jan 21, 2014 at 4:42 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > What mode are those sockets in (established, closed, etc)? Also, from the
> > ip, could you tell whether those sockets are from the client or from the
> > replica fetcher in the brokers.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Jan 21, 2014 at 3:29 AM, Ahmy Yulrizka <ah...@yulrizka.com>
> wrote:
> >
> > > We are running 3 kafka nodes, which servers 4 partition.
> > > We have been experiencing weird behavior during network outage.
> > >
> > > we had been experiencing twice in the last couple of days. the previous
> > one
> > > took down all of the cluster.
> > > while this one only 2 out of 3 survive. and 1 node became the leader of
> > all
> > > partition, and other node only in ISR of 1 partition (out of 4)
> > >
> > > my best guess now is that when the network down, the broker can't
> connect
> > > to other broker to do replication and keep opening the socket
> > > without closing it. But I'm not entirely sure about this.
> > >
> > > Is there any way to mitigate the problem ? or is there any
> configuration
> > > options to stop this from happening again ?
> > >
> > >
> > > The java/kafka process open too many socket file descriptor.
> > > running `lsof -a -p 11818` yield thousand of this line.
> > >
> > > ...
> > > java    11818 kafka 3059u  sock                0,7       0t0 615637305
> > > can't identify protocol
> > > java    11818 kafka 3060u  sock                0,7       0t0 615637306
> > > can't identify protocol
> > > java    11818 kafka 3061u  sock                0,7       0t0 615637307
> > > can't identify protocol
> > > java    11818 kafka 3062u  sock                0,7       0t0 615637308
> > > can't identify protocol
> > > java    11818 kafka 3063u  sock                0,7       0t0 615637309
> > > can't identify protocol
> > > java    11818 kafka 3064u  sock                0,7       0t0 615637310
> > > can't identify protocol
> > > java    11818 kafka 3065u  sock                0,7       0t0 615637311
> > > can't identify protocol
> > > ...
> > >
> > > i verify that the the open socket did not close when i repeated the
> > command
> > > after 2 minutes.
> > >
> > >
> > > and the kafka log on the broken node, generate lots of error like this:
> > >
> > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > kafka.network.Acceptor  - Error in acceptor
> > > java.io.IOException: Too many open files
> > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> > >         at
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > >         at java.lang.Thread.run(Thread.java:701)
> > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > kafka.network.Acceptor  - Error in acceptor
> > > java.io.IOException: Too many open files
> > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> > >         at
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > >         at java.lang.Thread.run(Thread.java:701)
> > > [2014-01-21 04:21:48,811]  64573917 [ReplicaFetcherThread-0-1] INFO
> > >  kafka.consumer.SimpleConsumer  - Reconnect due to socket error: null
> > > [2014-01-21 04:21:48,819]  64573925 [ReplicaFetcherThread-0-1] WARN
> > >  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-0-1], Error
> > in
> > > fetch Name: FetchRequest; Version: 0; CorrelationId: 74930218;
> ClientId:
> > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > bytes;
> > > RequestInfo: [some-topic,0] ->
> > > PartitionFetchInfo(959825,1048576),[some-topic,3] ->
> > > PartitionFetchInfo(551546,1048576)
> > > java.net.SocketException: Too many open files
> > >         at sun.nio.ch.Net.socket0(Native Method)
> > >         at sun.nio.ch.Net.socket(Net.java:156)
> > >         at
> > sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > >         at
> > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > >         at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > >         at
> > kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > >         at
> kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > >         at
> > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > >         at
> > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > >         at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > >         at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > >         at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > >         at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >         at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > >         at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > >         at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >         at
> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > >         at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > >         at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > >         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > >
> > >
> > > --
> > > Ahmy Yulrizka
> > > http://ahmy.yulrizka.com
> > > @yulrizka
> > >
> >
>

Re: Possibly leaking socket on ReplicaFetcherThread

Posted by Ahmy Yulrizka <ah...@yulrizka.com>.
this is the the line i copied on lsof

...
java      11818      kafka   98u     sock                0,7       0t0
 615628183 can't identify protocol
java      11818      kafka   99u     IPv4          615077352       0t0
   TCP somedomain.com:9092->121-123-123-123.someprovider.net:37547(CLOSE_WAIT)
java      11818      kafka  100u     IPv4          615077353       0t0
   TCP somedomain.com:9092->121-123-123-123.someprovider.net:37553(ESTABLISHED)
java      11818      kafka  101u     sock                0,7       0t0
 615628184 can't identify protocol
java      11818      kafka  102u     sock                0,7       0t0
 615628185 can't identify protocol
java      11818      kafka  103u     sock                0,7       0t0
 615628186 can't identify protocol
...

as you can see, from the output, i could see the connection state on some
of the TCP, but the sock only gives information "can't identify protocol"
so I could not see where or from this sock is originating

I could not see the connection also when i run netstat -nat

--
Ahmy Yulrizka
http://ahmy.yulrizka.com
@yulrizka


On Tue, Jan 21, 2014 at 4:42 PM, Jun Rao <ju...@gmail.com> wrote:

> What mode are those sockets in (established, closed, etc)? Also, from the
> ip, could you tell whether those sockets are from the client or from the
> replica fetcher in the brokers.
>
> Thanks,
>
> Jun
>
>
> On Tue, Jan 21, 2014 at 3:29 AM, Ahmy Yulrizka <ah...@yulrizka.com> wrote:
>
> > We are running 3 kafka nodes, which servers 4 partition.
> > We have been experiencing weird behavior during network outage.
> >
> > we had been experiencing twice in the last couple of days. the previous
> one
> > took down all of the cluster.
> > while this one only 2 out of 3 survive. and 1 node became the leader of
> all
> > partition, and other node only in ISR of 1 partition (out of 4)
> >
> > my best guess now is that when the network down, the broker can't connect
> > to other broker to do replication and keep opening the socket
> > without closing it. But I'm not entirely sure about this.
> >
> > Is there any way to mitigate the problem ? or is there any configuration
> > options to stop this from happening again ?
> >
> >
> > The java/kafka process open too many socket file descriptor.
> > running `lsof -a -p 11818` yield thousand of this line.
> >
> > ...
> > java    11818 kafka 3059u  sock                0,7       0t0 615637305
> > can't identify protocol
> > java    11818 kafka 3060u  sock                0,7       0t0 615637306
> > can't identify protocol
> > java    11818 kafka 3061u  sock                0,7       0t0 615637307
> > can't identify protocol
> > java    11818 kafka 3062u  sock                0,7       0t0 615637308
> > can't identify protocol
> > java    11818 kafka 3063u  sock                0,7       0t0 615637309
> > can't identify protocol
> > java    11818 kafka 3064u  sock                0,7       0t0 615637310
> > can't identify protocol
> > java    11818 kafka 3065u  sock                0,7       0t0 615637311
> > can't identify protocol
> > ...
> >
> > i verify that the the open socket did not close when i repeated the
> command
> > after 2 minutes.
> >
> >
> > and the kafka log on the broken node, generate lots of error like this:
> >
> > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > kafka.network.Acceptor  - Error in acceptor
> > java.io.IOException: Too many open files
> >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> >         at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> >         at java.lang.Thread.run(Thread.java:701)
> > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > kafka.network.Acceptor  - Error in acceptor
> > java.io.IOException: Too many open files
> >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> >         at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> >         at java.lang.Thread.run(Thread.java:701)
> > [2014-01-21 04:21:48,811]  64573917 [ReplicaFetcherThread-0-1] INFO
> >  kafka.consumer.SimpleConsumer  - Reconnect due to socket error: null
> > [2014-01-21 04:21:48,819]  64573925 [ReplicaFetcherThread-0-1] WARN
> >  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-0-1], Error
> in
> > fetch Name: FetchRequest; Version: 0; CorrelationId: 74930218; ClientId:
> > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> bytes;
> > RequestInfo: [some-topic,0] ->
> > PartitionFetchInfo(959825,1048576),[some-topic,3] ->
> > PartitionFetchInfo(551546,1048576)
> > java.net.SocketException: Too many open files
> >         at sun.nio.ch.Net.socket0(Native Method)
> >         at sun.nio.ch.Net.socket(Net.java:156)
> >         at
> sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> >         at
> >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> >         at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> >         at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> >         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> >         at
> kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> >         at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> >         at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> >         at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> >
> > --
> > Ahmy Yulrizka
> > http://ahmy.yulrizka.com
> > @yulrizka
> >
>

Re: Possibly leaking socket on ReplicaFetcherThread

Posted by Jun Rao <ju...@gmail.com>.
What mode are those sockets in (established, closed, etc)? Also, from the
ip, could you tell whether those sockets are from the client or from the
replica fetcher in the brokers.

Thanks,

Jun


On Tue, Jan 21, 2014 at 3:29 AM, Ahmy Yulrizka <ah...@yulrizka.com> wrote:

> We are running 3 kafka nodes, which servers 4 partition.
> We have been experiencing weird behavior during network outage.
>
> we had been experiencing twice in the last couple of days. the previous one
> took down all of the cluster.
> while this one only 2 out of 3 survive. and 1 node became the leader of all
> partition, and other node only in ISR of 1 partition (out of 4)
>
> my best guess now is that when the network down, the broker can't connect
> to other broker to do replication and keep opening the socket
> without closing it. But I'm not entirely sure about this.
>
> Is there any way to mitigate the problem ? or is there any configuration
> options to stop this from happening again ?
>
>
> The java/kafka process open too many socket file descriptor.
> running `lsof -a -p 11818` yield thousand of this line.
>
> ...
> java    11818 kafka 3059u  sock                0,7       0t0 615637305
> can't identify protocol
> java    11818 kafka 3060u  sock                0,7       0t0 615637306
> can't identify protocol
> java    11818 kafka 3061u  sock                0,7       0t0 615637307
> can't identify protocol
> java    11818 kafka 3062u  sock                0,7       0t0 615637308
> can't identify protocol
> java    11818 kafka 3063u  sock                0,7       0t0 615637309
> can't identify protocol
> java    11818 kafka 3064u  sock                0,7       0t0 615637310
> can't identify protocol
> java    11818 kafka 3065u  sock                0,7       0t0 615637311
> can't identify protocol
> ...
>
> i verify that the the open socket did not close when i repeated the command
> after 2 minutes.
>
>
> and the kafka log on the broken node, generate lots of error like this:
>
> [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> kafka.network.Acceptor  - Error in acceptor
> java.io.IOException: Too many open files
>         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
>         at
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
>         at kafka.network.Acceptor.accept(SocketServer.scala:200)
>         at kafka.network.Acceptor.run(SocketServer.scala:154)
>         at java.lang.Thread.run(Thread.java:701)
> [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> kafka.network.Acceptor  - Error in acceptor
> java.io.IOException: Too many open files
>         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
>         at
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
>         at kafka.network.Acceptor.accept(SocketServer.scala:200)
>         at kafka.network.Acceptor.run(SocketServer.scala:154)
>         at java.lang.Thread.run(Thread.java:701)
> [2014-01-21 04:21:48,811]  64573917 [ReplicaFetcherThread-0-1] INFO
>  kafka.consumer.SimpleConsumer  - Reconnect due to socket error: null
> [2014-01-21 04:21:48,819]  64573925 [ReplicaFetcherThread-0-1] WARN
>  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-0-1], Error in
> fetch Name: FetchRequest; Version: 0; CorrelationId: 74930218; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [some-topic,0] ->
> PartitionFetchInfo(959825,1048576),[some-topic,3] ->
> PartitionFetchInfo(551546,1048576)
> java.net.SocketException: Too many open files
>         at sun.nio.ch.Net.socket0(Native Method)
>         at sun.nio.ch.Net.socket(Net.java:156)
>         at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
>         at
>
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
>         at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
>         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>         at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>         at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>         at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
> --
> Ahmy Yulrizka
> http://ahmy.yulrizka.com
> @yulrizka
>