You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Rajiv Kurian <ra...@signalfuse.com> on 2015/01/26 23:27:54 UTC

SimpleConsumer leaks sockets on an UnresolvedAddressException

Here is my typical flow:
void run() {
  if (simpleConsumer == null) {
    simpleConsumer = new SimpleConsumer(host, port, (int) kafkaSocketTimeout,
kafkaRExeiveBufferSize, clientName);
  }
  try {
    // Do stuff with simpleConsumer.
   } catch (Exception e) {
     if (consumer != null) {
       simpleConsumer.close();
       simpleConsumer = null;
     }
  }
}

If there is a problem with the host name, or some DNS issues, we get an
UnresolvedAddressException as expected and attempt to close the
simpleConsumer. However this does not really get rid of the underlying
socket. So we end up leaking a FD every time this happens. Though this is
not a common case I think there needs to be a way on the SimpleConsumer to
get rid of all OS resources that it is holding onto. Right now if this
keeps happening the number of FDs being consumed by the process keeps
increasing till we hit the OS limits. As a user I cannot do anything else
but call simpleConsumer.close(). We need to be able to close the underlying
socketChannel/socket when this kind of an error happens.

To reproduce, one can just run this code but just put in any garbage host
name, running lsof -p while running this will show that the open FDs
increases without limit.

Thanks,
Rajiv

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

Posted by Guozhang Wang <wa...@gmail.com>.
Ewen, you are right, the patch is committed on Feb.20th last year, I will
leave a comment and close that ticket.

On Tue, Jan 27, 2015 at 7:24 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that
> will only be included in 0.8.2.
>
> Guozhang, it looks like you wrote the patch, Jun reviewed it, but the bug
> is still open and there's a comment that moved it to 0.9 after the commit
> was already made. Was the commit a mistake or did we just forget to close
> it?
>
> On Tue, Jan 27, 2015 at 10:20 AM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Here is the relevant stack trace:
> >
> > java.nio.channels.UnresolvedAddressException: null
> >
> >         at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55]
> >
> >         at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
> > ~[na:1.7.0_55]
> >
> >         at kafka.network.BlockingChannel.connect(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.consumer.SimpleConsumer.connect(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.metrics.KafkaTimer.time(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown
> Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.metrics.KafkaTimer.time(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > I am using 0.8.1. The source is here:
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
> > >
> > > Here is the definition of disconnect():
> > > private def disconnect() = {
> > >     if(blockingChannel.isConnected) {
> > >       debug("Disconnecting from " + host + ":" + port)
> > >       blockingChannel.disconnect()
> > >     }
> > >   }
> > > It checks if blockingChannel.isConnected before calling
> > > blockingChannel.disconnect(). I think if there is an
> > > UnresolvedAddressException, the isConnected is never set and the
> > > blockingChannel.disconnect() is never called. But by this point we have
> > > already created a socket and will leak it.
> > >
> > > The same problem might be present in the connect method of the
> > > BlockingChannel at
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala
> > .
> > > Though its own disconnect method seems to check for both the connected:
> > >
> > > def disconnect() = lock synchronized {
> > >     // My comment: connected will not be set if we get an
> > > UnresolvedAddressException but channel should NOT  be null, so we will
> > > probably still do the right thing.
> > >     if(connected || channel != null) {
> > >       // closing the main socket channel *should* close the read
> channel
> > >       // but let's do it to be sure.
> > >       swallow(channel.close())
> > >       swallow(channel.socket.close())
> > >       if(readChannel != null) swallow(readChannel.close())
> > >       channel = null; readChannel = null; writeChannel = null
> > >       connected = false
> > >     }
> > >   }
> > >
> > >
> > >
> > > On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > >> Rajiv,
> > >>
> > >> Which version of Kafka are you using? I just checked SimpleConsumer's
> > >> code,
> > >> and in its close() function, disconnect() is called, which will close
> > the
> > >> socket.
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian <ra...@signalfuse.com>
> > >> wrote:
> > >>
> > >> > Meant to write a run loop.
> > >> >
> > >> > void run() {
> > >> >   while (running) {
> > >> >     if (simpleConsumer == null) {
> > >> >       simpleConsumer = new SimpleConsumer(host, port,
> > >> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
> > >> >     }
> > >> >     try {
> > >> >       // Do stuff with simpleConsumer.
> > >> >     } catch (Exception e) {
> > >> >       logger.error(e);  // Assume UnresolvedAddressException.
> > >> >       if (consumer != null) {
> > >> >         simpleConsumer.close();
> > >> >         simpleConsumer = null;
> > >> >       }
> > >> >     }
> > >> >   }
> > >> > }
> > >> >
> > >> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <rajiv@signalfuse.com
> >
> > >> > wrote:
> > >> >
> > >> > > Here is my typical flow:
> > >> > > void run() {
> > >> > >   if (simpleConsumer == null) {
> > >> > >     simpleConsumer = new SimpleConsumer(host, port, (int)
> > >> > kafkaSocketTimeout,
> > >> > > kafkaRExeiveBufferSize, clientName);
> > >> > >   }
> > >> > >   try {
> > >> > >     // Do stuff with simpleConsumer.
> > >> > >    } catch (Exception e) {
> > >> > >      if (consumer != null) {
> > >> > >        simpleConsumer.close();
> > >> > >        simpleConsumer = null;
> > >> > >      }
> > >> > >   }
> > >> > > }
> > >> > >
> > >> > > If there is a problem with the host name, or some DNS issues, we
> get
> > >> an
> > >> > > UnresolvedAddressException as expected and attempt to close the
> > >> > > simpleConsumer. However this does not really get rid of the
> > underlying
> > >> > > socket. So we end up leaking a FD every time this happens. Though
> > >> this is
> > >> > > not a common case I think there needs to be a way on the
> > >> SimpleConsumer
> > >> > to
> > >> > > get rid of all OS resources that it is holding onto. Right now if
> > this
> > >> > > keeps happening the number of FDs being consumed by the process
> > keeps
> > >> > > increasing till we hit the OS limits. As a user I cannot do
> anything
> > >> else
> > >> > > but call simpleConsumer.close(). We need to be able to close the
> > >> > underlying
> > >> > > socketChannel/socket when this kind of an error happens.
> > >> > >
> > >> > > To reproduce, one can just run this code but just put in any
> garbage
> > >> host
> > >> > > name, running lsof -p while running this will show that the open
> FDs
> > >> > > increases without limit.
> > >> > >
> > >> > > Thanks,
> > >> > > Rajiv
> > >> > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>



-- 
-- Guozhang

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that
will only be included in 0.8.2.

Guozhang, it looks like you wrote the patch, Jun reviewed it, but the bug
is still open and there's a comment that moved it to 0.9 after the commit
was already made. Was the commit a mistake or did we just forget to close
it?

On Tue, Jan 27, 2015 at 10:20 AM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Here is the relevant stack trace:
>
> java.nio.channels.UnresolvedAddressException: null
>
>         at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55]
>
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
> ~[na:1.7.0_55]
>
>         at kafka.network.BlockingChannel.connect(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.consumer.SimpleConsumer.connect(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > I am using 0.8.1. The source is here:
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
> >
> > Here is the definition of disconnect():
> > private def disconnect() = {
> >     if(blockingChannel.isConnected) {
> >       debug("Disconnecting from " + host + ":" + port)
> >       blockingChannel.disconnect()
> >     }
> >   }
> > It checks if blockingChannel.isConnected before calling
> > blockingChannel.disconnect(). I think if there is an
> > UnresolvedAddressException, the isConnected is never set and the
> > blockingChannel.disconnect() is never called. But by this point we have
> > already created a socket and will leak it.
> >
> > The same problem might be present in the connect method of the
> > BlockingChannel at
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala
> .
> > Though its own disconnect method seems to check for both the connected:
> >
> > def disconnect() = lock synchronized {
> >     // My comment: connected will not be set if we get an
> > UnresolvedAddressException but channel should NOT  be null, so we will
> > probably still do the right thing.
> >     if(connected || channel != null) {
> >       // closing the main socket channel *should* close the read channel
> >       // but let's do it to be sure.
> >       swallow(channel.close())
> >       swallow(channel.socket.close())
> >       if(readChannel != null) swallow(readChannel.close())
> >       channel = null; readChannel = null; writeChannel = null
> >       connected = false
> >     }
> >   }
> >
> >
> >
> > On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Rajiv,
> >>
> >> Which version of Kafka are you using? I just checked SimpleConsumer's
> >> code,
> >> and in its close() function, disconnect() is called, which will close
> the
> >> socket.
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian <ra...@signalfuse.com>
> >> wrote:
> >>
> >> > Meant to write a run loop.
> >> >
> >> > void run() {
> >> >   while (running) {
> >> >     if (simpleConsumer == null) {
> >> >       simpleConsumer = new SimpleConsumer(host, port,
> >> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
> >> >     }
> >> >     try {
> >> >       // Do stuff with simpleConsumer.
> >> >     } catch (Exception e) {
> >> >       logger.error(e);  // Assume UnresolvedAddressException.
> >> >       if (consumer != null) {
> >> >         simpleConsumer.close();
> >> >         simpleConsumer = null;
> >> >       }
> >> >     }
> >> >   }
> >> > }
> >> >
> >> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com>
> >> > wrote:
> >> >
> >> > > Here is my typical flow:
> >> > > void run() {
> >> > >   if (simpleConsumer == null) {
> >> > >     simpleConsumer = new SimpleConsumer(host, port, (int)
> >> > kafkaSocketTimeout,
> >> > > kafkaRExeiveBufferSize, clientName);
> >> > >   }
> >> > >   try {
> >> > >     // Do stuff with simpleConsumer.
> >> > >    } catch (Exception e) {
> >> > >      if (consumer != null) {
> >> > >        simpleConsumer.close();
> >> > >        simpleConsumer = null;
> >> > >      }
> >> > >   }
> >> > > }
> >> > >
> >> > > If there is a problem with the host name, or some DNS issues, we get
> >> an
> >> > > UnresolvedAddressException as expected and attempt to close the
> >> > > simpleConsumer. However this does not really get rid of the
> underlying
> >> > > socket. So we end up leaking a FD every time this happens. Though
> >> this is
> >> > > not a common case I think there needs to be a way on the
> >> SimpleConsumer
> >> > to
> >> > > get rid of all OS resources that it is holding onto. Right now if
> this
> >> > > keeps happening the number of FDs being consumed by the process
> keeps
> >> > > increasing till we hit the OS limits. As a user I cannot do anything
> >> else
> >> > > but call simpleConsumer.close(). We need to be able to close the
> >> > underlying
> >> > > socketChannel/socket when this kind of an error happens.
> >> > >
> >> > > To reproduce, one can just run this code but just put in any garbage
> >> host
> >> > > name, running lsof -p while running this will show that the open FDs
> >> > > increases without limit.
> >> > >
> >> > > Thanks,
> >> > > Rajiv
> >> > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
Thanks,
Ewen

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

Posted by Rajiv Kurian <ra...@signalfuse.com>.
Here is the relevant stack trace:

java.nio.channels.UnresolvedAddressException: null

        at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55]

        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
~[na:1.7.0_55]

        at kafka.network.BlockingChannel.connect(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.consumer.SimpleConsumer.connect(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.metrics.KafkaTimer.time(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.metrics.KafkaTimer.time(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> I am using 0.8.1. The source is here:
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
>
> Here is the definition of disconnect():
> private def disconnect() = {
>     if(blockingChannel.isConnected) {
>       debug("Disconnecting from " + host + ":" + port)
>       blockingChannel.disconnect()
>     }
>   }
> It checks if blockingChannel.isConnected before calling
> blockingChannel.disconnect(). I think if there is an
> UnresolvedAddressException, the isConnected is never set and the
> blockingChannel.disconnect() is never called. But by this point we have
> already created a socket and will leak it.
>
> The same problem might be present in the connect method of the
> BlockingChannel at
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala.
> Though its own disconnect method seems to check for both the connected:
>
> def disconnect() = lock synchronized {
>     // My comment: connected will not be set if we get an
> UnresolvedAddressException but channel should NOT  be null, so we will
> probably still do the right thing.
>     if(connected || channel != null) {
>       // closing the main socket channel *should* close the read channel
>       // but let's do it to be sure.
>       swallow(channel.close())
>       swallow(channel.socket.close())
>       if(readChannel != null) swallow(readChannel.close())
>       channel = null; readChannel = null; writeChannel = null
>       connected = false
>     }
>   }
>
>
>
> On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
>> Rajiv,
>>
>> Which version of Kafka are you using? I just checked SimpleConsumer's
>> code,
>> and in its close() function, disconnect() is called, which will close the
>> socket.
>>
>> Guozhang
>>
>>
>> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian <ra...@signalfuse.com>
>> wrote:
>>
>> > Meant to write a run loop.
>> >
>> > void run() {
>> >   while (running) {
>> >     if (simpleConsumer == null) {
>> >       simpleConsumer = new SimpleConsumer(host, port,
>> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
>> >     }
>> >     try {
>> >       // Do stuff with simpleConsumer.
>> >     } catch (Exception e) {
>> >       logger.error(e);  // Assume UnresolvedAddressException.
>> >       if (consumer != null) {
>> >         simpleConsumer.close();
>> >         simpleConsumer = null;
>> >       }
>> >     }
>> >   }
>> > }
>> >
>> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com>
>> > wrote:
>> >
>> > > Here is my typical flow:
>> > > void run() {
>> > >   if (simpleConsumer == null) {
>> > >     simpleConsumer = new SimpleConsumer(host, port, (int)
>> > kafkaSocketTimeout,
>> > > kafkaRExeiveBufferSize, clientName);
>> > >   }
>> > >   try {
>> > >     // Do stuff with simpleConsumer.
>> > >    } catch (Exception e) {
>> > >      if (consumer != null) {
>> > >        simpleConsumer.close();
>> > >        simpleConsumer = null;
>> > >      }
>> > >   }
>> > > }
>> > >
>> > > If there is a problem with the host name, or some DNS issues, we get
>> an
>> > > UnresolvedAddressException as expected and attempt to close the
>> > > simpleConsumer. However this does not really get rid of the underlying
>> > > socket. So we end up leaking a FD every time this happens. Though
>> this is
>> > > not a common case I think there needs to be a way on the
>> SimpleConsumer
>> > to
>> > > get rid of all OS resources that it is holding onto. Right now if this
>> > > keeps happening the number of FDs being consumed by the process keeps
>> > > increasing till we hit the OS limits. As a user I cannot do anything
>> else
>> > > but call simpleConsumer.close(). We need to be able to close the
>> > underlying
>> > > socketChannel/socket when this kind of an error happens.
>> > >
>> > > To reproduce, one can just run this code but just put in any garbage
>> host
>> > > name, running lsof -p while running this will show that the open FDs
>> > > increases without limit.
>> > >
>> > > Thanks,
>> > > Rajiv
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

Posted by Rajiv Kurian <ra...@signalfuse.com>.
I am using 0.8.1. The source is here:
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala

Here is the definition of disconnect():
private def disconnect() = {
    if(blockingChannel.isConnected) {
      debug("Disconnecting from " + host + ":" + port)
      blockingChannel.disconnect()
    }
  }
It checks if blockingChannel.isConnected before calling
blockingChannel.disconnect(). I think if there is an
UnresolvedAddressException, the isConnected is never set and the
blockingChannel.disconnect() is never called. But by this point we have
already created a socket and will leak it.

The same problem might be present in the connect method of the
BlockingChannel at
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala.
Though its own disconnect method seems to check for both the connected:

def disconnect() = lock synchronized {
    // My comment: connected will not be set if we get an
UnresolvedAddressException but channel should NOT  be null, so we will
probably still do the right thing.
    if(connected || channel != null) {
      // closing the main socket channel *should* close the read channel
      // but let's do it to be sure.
      swallow(channel.close())
      swallow(channel.socket.close())
      if(readChannel != null) swallow(readChannel.close())
      channel = null; readChannel = null; writeChannel = null
      connected = false
    }
  }



On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Rajiv,
>
> Which version of Kafka are you using? I just checked SimpleConsumer's code,
> and in its close() function, disconnect() is called, which will close the
> socket.
>
> Guozhang
>
>
> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Meant to write a run loop.
> >
> > void run() {
> >   while (running) {
> >     if (simpleConsumer == null) {
> >       simpleConsumer = new SimpleConsumer(host, port,
> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
> >     }
> >     try {
> >       // Do stuff with simpleConsumer.
> >     } catch (Exception e) {
> >       logger.error(e);  // Assume UnresolvedAddressException.
> >       if (consumer != null) {
> >         simpleConsumer.close();
> >         simpleConsumer = null;
> >       }
> >     }
> >   }
> > }
> >
> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > Here is my typical flow:
> > > void run() {
> > >   if (simpleConsumer == null) {
> > >     simpleConsumer = new SimpleConsumer(host, port, (int)
> > kafkaSocketTimeout,
> > > kafkaRExeiveBufferSize, clientName);
> > >   }
> > >   try {
> > >     // Do stuff with simpleConsumer.
> > >    } catch (Exception e) {
> > >      if (consumer != null) {
> > >        simpleConsumer.close();
> > >        simpleConsumer = null;
> > >      }
> > >   }
> > > }
> > >
> > > If there is a problem with the host name, or some DNS issues, we get an
> > > UnresolvedAddressException as expected and attempt to close the
> > > simpleConsumer. However this does not really get rid of the underlying
> > > socket. So we end up leaking a FD every time this happens. Though this
> is
> > > not a common case I think there needs to be a way on the SimpleConsumer
> > to
> > > get rid of all OS resources that it is holding onto. Right now if this
> > > keeps happening the number of FDs being consumed by the process keeps
> > > increasing till we hit the OS limits. As a user I cannot do anything
> else
> > > but call simpleConsumer.close(). We need to be able to close the
> > underlying
> > > socketChannel/socket when this kind of an error happens.
> > >
> > > To reproduce, one can just run this code but just put in any garbage
> host
> > > name, running lsof -p while running this will show that the open FDs
> > > increases without limit.
> > >
> > > Thanks,
> > > Rajiv
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

Posted by Guozhang Wang <wa...@gmail.com>.
Rajiv,

Which version of Kafka are you using? I just checked SimpleConsumer's code,
and in its close() function, disconnect() is called, which will close the
socket.

Guozhang


On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Meant to write a run loop.
>
> void run() {
>   while (running) {
>     if (simpleConsumer == null) {
>       simpleConsumer = new SimpleConsumer(host, port,
> (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
>     }
>     try {
>       // Do stuff with simpleConsumer.
>     } catch (Exception e) {
>       logger.error(e);  // Assume UnresolvedAddressException.
>       if (consumer != null) {
>         simpleConsumer.close();
>         simpleConsumer = null;
>       }
>     }
>   }
> }
>
> On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Here is my typical flow:
> > void run() {
> >   if (simpleConsumer == null) {
> >     simpleConsumer = new SimpleConsumer(host, port, (int)
> kafkaSocketTimeout,
> > kafkaRExeiveBufferSize, clientName);
> >   }
> >   try {
> >     // Do stuff with simpleConsumer.
> >    } catch (Exception e) {
> >      if (consumer != null) {
> >        simpleConsumer.close();
> >        simpleConsumer = null;
> >      }
> >   }
> > }
> >
> > If there is a problem with the host name, or some DNS issues, we get an
> > UnresolvedAddressException as expected and attempt to close the
> > simpleConsumer. However this does not really get rid of the underlying
> > socket. So we end up leaking a FD every time this happens. Though this is
> > not a common case I think there needs to be a way on the SimpleConsumer
> to
> > get rid of all OS resources that it is holding onto. Right now if this
> > keeps happening the number of FDs being consumed by the process keeps
> > increasing till we hit the OS limits. As a user I cannot do anything else
> > but call simpleConsumer.close(). We need to be able to close the
> underlying
> > socketChannel/socket when this kind of an error happens.
> >
> > To reproduce, one can just run this code but just put in any garbage host
> > name, running lsof -p while running this will show that the open FDs
> > increases without limit.
> >
> > Thanks,
> > Rajiv
> >
> >
>



-- 
-- Guozhang

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

Posted by Rajiv Kurian <ra...@signalfuse.com>.
Meant to write a run loop.

void run() {
  while (running) {
    if (simpleConsumer == null) {
      simpleConsumer = new SimpleConsumer(host, port,
(int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
    }
    try {
      // Do stuff with simpleConsumer.
    } catch (Exception e) {
      logger.error(e);  // Assume UnresolvedAddressException.
      if (consumer != null) {
        simpleConsumer.close();
        simpleConsumer = null;
      }
    }
  }
}

On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Here is my typical flow:
> void run() {
>   if (simpleConsumer == null) {
>     simpleConsumer = new SimpleConsumer(host, port, (int) kafkaSocketTimeout,
> kafkaRExeiveBufferSize, clientName);
>   }
>   try {
>     // Do stuff with simpleConsumer.
>    } catch (Exception e) {
>      if (consumer != null) {
>        simpleConsumer.close();
>        simpleConsumer = null;
>      }
>   }
> }
>
> If there is a problem with the host name, or some DNS issues, we get an
> UnresolvedAddressException as expected and attempt to close the
> simpleConsumer. However this does not really get rid of the underlying
> socket. So we end up leaking a FD every time this happens. Though this is
> not a common case I think there needs to be a way on the SimpleConsumer to
> get rid of all OS resources that it is holding onto. Right now if this
> keeps happening the number of FDs being consumed by the process keeps
> increasing till we hit the OS limits. As a user I cannot do anything else
> but call simpleConsumer.close(). We need to be able to close the underlying
> socketChannel/socket when this kind of an error happens.
>
> To reproduce, one can just run this code but just put in any garbage host
> name, running lsof -p while running this will show that the open FDs
> increases without limit.
>
> Thanks,
> Rajiv
>
>