You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Helin Xiang <xk...@gmail.com> on 2014/12/10 04:09:59 UTC

Re: Possibly leaking socket on ReplicaFetcherThread

Hi, Jun

We experienced a network device problem. and cause all brokers crashed.
After investigation, we found server log throw similar exceptions.

this:

java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:29)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
        at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
        at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)


and this:

2014-12-04 17:12:19,223 [kafka-scheduler-1] FATAL
kafka.server.ReplicaManager  - [Replica Manager on Broker 1]: Error writing
to highwatermark file:
java.io.FileNotFoundException:
/data3/kafka-logs/replication-offset-checkpoint.tmp (Too many open files)
        at java.io.FileOutputStream.open(Native Method)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:145)
        at java.io.FileWriter.<init>(FileWriter.java:73)
        at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
        at
kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:447)
        at
kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:444)
        at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)


we count the number of java.nio.channels.UnresolvedAddressException  and
found it is around 63000, since a healthy kafka would open 2k fd in our
environment, we believe opened fd hit the our system's limit 65535.

so, it seems the bug is not fixed.

after checking the code, we believe it still would leak socket fd.
===============================================
our guess:

in simpleconsumer.scala:

  private def disconnect() = {
    if(blockingChannel.isConnected) {
      debug("Disconnecting from " + host + ":" + port)
      blockingChannel.disconnect()
    }
  }

but when the exception happened, blockingChannel.isConnected would be false,
because in blockingchannel.scala:

  def connect() = lock synchronized  {
    if(!connected) {
      channel = SocketChannel.open()
      if(readBufferSize > 0)
        channel.socket.setReceiveBufferSize(readBufferSize)
      if(writeBufferSize > 0)
        channel.socket.setSendBufferSize(writeBufferSize)
      channel.configureBlocking(true)
      channel.socket.setSoTimeout(readTimeoutMs)
      channel.socket.setKeepAlive(true)
      channel.socket.setTcpNoDelay(true)
      channel.connect(new InetSocketAddress(host, port))    <--  exception
happened here

      writeChannel = channel
      readChannel = Channels.newChannel(channel.socket().getInputStream)
      connected = true   <--  connected reset happened here, no chance to
be true
      ... ...


Thanks.


-- 


*Best Regards*Xiang Helin

Re: Possibly leaking socket on ReplicaFetcherThread

Posted by Jun Rao <ju...@confluent.io>.
Ah, make sense. It seems that this is already fixed in 0.8.2 and trunk.

Thanks,

Jun

On Fri, Dec 12, 2014 at 5:34 PM, Helin Xiang <xk...@gmail.com> wrote:

> Hi, Jun
>
> What you said is right, but in the code of simpleconsumer ( where the
> BlockingChannel.disconnect()
> will be called ), it firstly checked if the channel is connected, that's
> the real problem.
>
> And we reproduced the problem in our testing environment.
> first we use iptables to drop packet and reset tcp link to/from the other
> broker;
> secondly, we use iptables to drop packet to/from dns server.
>
> then the broker crashes immediately.
>
> after we change the code, it will not crash anymore.
>
> =========
>
> --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
> +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
> @@ -46,10 +46,10 @@ class SimpleConsumer(val host: String,
>    }
>
>    private def disconnect() = {
> -    if(blockingChannel.isConnected) {
> +    //if(blockingChannel.isConnected) {
>        debug("Disconnecting from " + host + ":" + port)
>        blockingChannel.disconnect()
> -    }
> +    //}
>    }
>
>
> On Sat, Dec 13, 2014 at 1:23 AM, Jun Rao <ju...@confluent.io> wrote:
> >
> > Hmm, but if we hit an exception in BlockingChannel.connect(), we will
> > call BlockingChannel.disconnect(), which will close the socket channel.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Dec 9, 2014 at 7:09 PM, Helin Xiang <xk...@gmail.com> wrote:
> >
> > > Hi, Jun
> > >
> > > We experienced a network device problem. and cause all brokers crashed.
> > > After investigation, we found server log throw similar exceptions.
> > >
> > > this:
> > >
> > > java.nio.channels.UnresolvedAddressException
> > >         at sun.nio.ch.Net.checkAddress(Net.java:29)
> > >         at
> > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512)
> > >         at
> > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > >         at
> kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > >         at
> > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > >         at
> > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > >         at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > >         at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > >
> > >
> > > and this:
> > >
> > > 2014-12-04 17:12:19,223 [kafka-scheduler-1] FATAL
> > > kafka.server.ReplicaManager  - [Replica Manager on Broker 1]: Error
> > writing
> > > to highwatermark file:
> > > java.io.FileNotFoundException:
> > > /data3/kafka-logs/replication-offset-checkpoint.tmp (Too many open
> files)
> > >         at java.io.FileOutputStream.open(Native Method)
> > >         at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
> > >         at java.io.FileOutputStream.<init>(FileOutputStream.java:145)
> > >         at java.io.FileWriter.<init>(FileWriter.java:73)
> > >         at
> kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
> > >         at
> > >
> > >
> >
> kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:447)
> > >         at
> > >
> > >
> >
> kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:444)
> > >         at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
> > >         at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
> > >
> > >
> > > we count the number of java.nio.channels.UnresolvedAddressException
> and
> > > found it is around 63000, since a healthy kafka would open 2k fd in our
> > > environment, we believe opened fd hit the our system's limit 65535.
> > >
> > > so, it seems the bug is not fixed.
> > >
> > > after checking the code, we believe it still would leak socket fd.
> > > ===============================================
> > > our guess:
> > >
> > > in simpleconsumer.scala:
> > >
> > >   private def disconnect() = {
> > >     if(blockingChannel.isConnected) {
> > >       debug("Disconnecting from " + host + ":" + port)
> > >       blockingChannel.disconnect()
> > >     }
> > >   }
> > >
> > > but when the exception happened, blockingChannel.isConnected would be
> > > false,
> > > because in blockingchannel.scala:
> > >
> > >   def connect() = lock synchronized  {
> > >     if(!connected) {
> > >       channel = SocketChannel.open()
> > >       if(readBufferSize > 0)
> > >         channel.socket.setReceiveBufferSize(readBufferSize)
> > >       if(writeBufferSize > 0)
> > >         channel.socket.setSendBufferSize(writeBufferSize)
> > >       channel.configureBlocking(true)
> > >       channel.socket.setSoTimeout(readTimeoutMs)
> > >       channel.socket.setKeepAlive(true)
> > >       channel.socket.setTcpNoDelay(true)
> > >       channel.connect(new InetSocketAddress(host, port))    <--
> > exception
> > > happened here
> > >
> > >       writeChannel = channel
> > >       readChannel =
> Channels.newChannel(channel.socket().getInputStream)
> > >       connected = true   <--  connected reset happened here, no chance
> to
> > > be true
> > >       ... ...
> > >
> > >
> > > Thanks.
> > >
> > >
> > > --
> > >
> > >
> > > *Best Regards*Xiang Helin
> > >
> >
>
>
> --
>
>
> *Best Regards向河林*
>

Re: Possibly leaking socket on ReplicaFetcherThread

Posted by Helin Xiang <xk...@gmail.com>.
Hi, Jun

What you said is right, but in the code of simpleconsumer ( where the
BlockingChannel.disconnect()
will be called ), it firstly checked if the channel is connected, that's
the real problem.

And we reproduced the problem in our testing environment.
first we use iptables to drop packet and reset tcp link to/from the other
broker;
secondly, we use iptables to drop packet to/from dns server.

then the broker crashes immediately.

after we change the code, it will not crash anymore.

=========

--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -46,10 +46,10 @@ class SimpleConsumer(val host: String,
   }

   private def disconnect() = {
-    if(blockingChannel.isConnected) {
+    //if(blockingChannel.isConnected) {
       debug("Disconnecting from " + host + ":" + port)
       blockingChannel.disconnect()
-    }
+    //}
   }


On Sat, Dec 13, 2014 at 1:23 AM, Jun Rao <ju...@confluent.io> wrote:
>
> Hmm, but if we hit an exception in BlockingChannel.connect(), we will
> call BlockingChannel.disconnect(), which will close the socket channel.
>
> Thanks,
>
> Jun
>
> On Tue, Dec 9, 2014 at 7:09 PM, Helin Xiang <xk...@gmail.com> wrote:
>
> > Hi, Jun
> >
> > We experienced a network device problem. and cause all brokers crashed.
> > After investigation, we found server log throw similar exceptions.
> >
> > this:
> >
> > java.nio.channels.UnresolvedAddressException
> >         at sun.nio.ch.Net.checkAddress(Net.java:29)
> >         at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512)
> >         at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> >         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> >         at
> kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> >         at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >
> >
> > and this:
> >
> > 2014-12-04 17:12:19,223 [kafka-scheduler-1] FATAL
> > kafka.server.ReplicaManager  - [Replica Manager on Broker 1]: Error
> writing
> > to highwatermark file:
> > java.io.FileNotFoundException:
> > /data3/kafka-logs/replication-offset-checkpoint.tmp (Too many open files)
> >         at java.io.FileOutputStream.open(Native Method)
> >         at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
> >         at java.io.FileOutputStream.<init>(FileOutputStream.java:145)
> >         at java.io.FileWriter.<init>(FileWriter.java:73)
> >         at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
> >         at
> >
> >
> kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:447)
> >         at
> >
> >
> kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:444)
> >         at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
> >
> >
> > we count the number of java.nio.channels.UnresolvedAddressException  and
> > found it is around 63000, since a healthy kafka would open 2k fd in our
> > environment, we believe opened fd hit the our system's limit 65535.
> >
> > so, it seems the bug is not fixed.
> >
> > after checking the code, we believe it still would leak socket fd.
> > ===============================================
> > our guess:
> >
> > in simpleconsumer.scala:
> >
> >   private def disconnect() = {
> >     if(blockingChannel.isConnected) {
> >       debug("Disconnecting from " + host + ":" + port)
> >       blockingChannel.disconnect()
> >     }
> >   }
> >
> > but when the exception happened, blockingChannel.isConnected would be
> > false,
> > because in blockingchannel.scala:
> >
> >   def connect() = lock synchronized  {
> >     if(!connected) {
> >       channel = SocketChannel.open()
> >       if(readBufferSize > 0)
> >         channel.socket.setReceiveBufferSize(readBufferSize)
> >       if(writeBufferSize > 0)
> >         channel.socket.setSendBufferSize(writeBufferSize)
> >       channel.configureBlocking(true)
> >       channel.socket.setSoTimeout(readTimeoutMs)
> >       channel.socket.setKeepAlive(true)
> >       channel.socket.setTcpNoDelay(true)
> >       channel.connect(new InetSocketAddress(host, port))    <--
> exception
> > happened here
> >
> >       writeChannel = channel
> >       readChannel = Channels.newChannel(channel.socket().getInputStream)
> >       connected = true   <--  connected reset happened here, no chance to
> > be true
> >       ... ...
> >
> >
> > Thanks.
> >
> >
> > --
> >
> >
> > *Best Regards*Xiang Helin
> >
>


-- 


*Best Regards向河林*

Re: Possibly leaking socket on ReplicaFetcherThread

Posted by Jun Rao <ju...@confluent.io>.
Hmm, but if we hit an exception in BlockingChannel.connect(), we will
call BlockingChannel.disconnect(), which will close the socket channel.

Thanks,

Jun

On Tue, Dec 9, 2014 at 7:09 PM, Helin Xiang <xk...@gmail.com> wrote:

> Hi, Jun
>
> We experienced a network device problem. and cause all brokers crashed.
> After investigation, we found server log throw similar exceptions.
>
> this:
>
> java.nio.channels.UnresolvedAddressException
>         at sun.nio.ch.Net.checkAddress(Net.java:29)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>         at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>         at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>
>
> and this:
>
> 2014-12-04 17:12:19,223 [kafka-scheduler-1] FATAL
> kafka.server.ReplicaManager  - [Replica Manager on Broker 1]: Error writing
> to highwatermark file:
> java.io.FileNotFoundException:
> /data3/kafka-logs/replication-offset-checkpoint.tmp (Too many open files)
>         at java.io.FileOutputStream.open(Native Method)
>         at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
>         at java.io.FileOutputStream.<init>(FileOutputStream.java:145)
>         at java.io.FileWriter.<init>(FileWriter.java:73)
>         at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
>         at
>
> kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:447)
>         at
>
> kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:444)
>         at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
>
>
> we count the number of java.nio.channels.UnresolvedAddressException  and
> found it is around 63000, since a healthy kafka would open 2k fd in our
> environment, we believe opened fd hit the our system's limit 65535.
>
> so, it seems the bug is not fixed.
>
> after checking the code, we believe it still would leak socket fd.
> ===============================================
> our guess:
>
> in simpleconsumer.scala:
>
>   private def disconnect() = {
>     if(blockingChannel.isConnected) {
>       debug("Disconnecting from " + host + ":" + port)
>       blockingChannel.disconnect()
>     }
>   }
>
> but when the exception happened, blockingChannel.isConnected would be
> false,
> because in blockingchannel.scala:
>
>   def connect() = lock synchronized  {
>     if(!connected) {
>       channel = SocketChannel.open()
>       if(readBufferSize > 0)
>         channel.socket.setReceiveBufferSize(readBufferSize)
>       if(writeBufferSize > 0)
>         channel.socket.setSendBufferSize(writeBufferSize)
>       channel.configureBlocking(true)
>       channel.socket.setSoTimeout(readTimeoutMs)
>       channel.socket.setKeepAlive(true)
>       channel.socket.setTcpNoDelay(true)
>       channel.connect(new InetSocketAddress(host, port))    <--  exception
> happened here
>
>       writeChannel = channel
>       readChannel = Channels.newChannel(channel.socket().getInputStream)
>       connected = true   <--  connected reset happened here, no chance to
> be true
>       ... ...
>
>
> Thanks.
>
>
> --
>
>
> *Best Regards*Xiang Helin
>