You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Lorenzo Alberton <l....@gmail.com> on 2012/08/08 00:45:52 UTC

Non-blocking socket when queue has no new data

Hi,

I have a question about non-blocking connections.

>From the code I see the request is handled by an acceptor thread that
passes it on to a Processor thread.
The connection by default is blocking, i.e. the consumer sends the fetch
request and then blocks until there's data available.
I understand that this is exactly the intended behaviour, although I'm
wondering if it's possible to establish a non-blocking connection that
immediately returns if there's no data available at the given offset
(assuming the offset is valid and points at the end of the queue).

We've been load-testing kafka with a few thousand topics and many
short-lived consumers that fetch a limited number of messages before
closing the connection.
We have a socket timeout on the client side to close the socket if there's
no data available, but the kafka server doesn't close the socket at its end
until new data becomes available and a write() call is attempted. When this
happens, we can see the following stack trace in the logs:


==============================================

kafka: INFO  [kafka.network.Processor] (kafka-processor-7) Closing socket
connection to <host>.
kafka: ERROR [kafka.network.Processor] (kafka-processor-5) Closing socket
for <host> because of error
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:102)
at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:53)
at kafka.network.Processor.write(SocketServer.scala:339)
at kafka.network.Processor.run(SocketServer.scala:216)
at java.lang.Thread.run(Thread.java:662)

==============================================

Apart from the exception in the logs, the file descriptor for the socket is
not released until a write() call is attempted, limiting the amount of
connections that can be established.

I do realise that Kafka was designed to work with long-running processes
that maintain a persistent connection, but I see on the mailing-list a lot
of interest around having a REST interface in front of it or however
consuming data in chunks with short-lived processes, and we're very
interested in this scenario as well for one of our use-cases.

So my question really is, is there a plan to have a non-blocking
connection?
In non-blocking mode, the Response could be immediate and could consist in
the header alone (int32 for the response length, set to 0, followed by
int16 for the error code, which could be set to 0 too or to a new value
indicating that the operation would normally block).
We can probably contribute this feature if not yet available and others
find it useful.

Another option (which I don't like as much) could be setting a request
timeout (server-side) or a way of closing the connection gracefully from
the client side.

Thoughts?

Best regards,
-- 
Lorenzo Alberton
Chief Tech Architect
DataSift, Inc.

Re: Non-blocking socket when queue has no new data

Posted by Jun Rao <ju...@gmail.com>.
Lorenzo,

Currently, the consumer is blocking. However, I am not sure if that's
causing your problem. If you close the consumer socket connection, the
broker socket selector should receive an invalid key and close the socket.

Thanks

Jun

On Tue, Aug 7, 2012 at 3:45 PM, Lorenzo Alberton <l....@gmail.com>wrote:

> Hi,
>
> I have a question about non-blocking connections.
>
> From the code I see the request is handled by an acceptor thread that
> passes it on to a Processor thread.
> The connection by default is blocking, i.e. the consumer sends the fetch
> request and then blocks until there's data available.
> I understand that this is exactly the intended behaviour, although I'm
> wondering if it's possible to establish a non-blocking connection that
> immediately returns if there's no data available at the given offset
> (assuming the offset is valid and points at the end of the queue).
>
> We've been load-testing kafka with a few thousand topics and many
> short-lived consumers that fetch a limited number of messages before
> closing the connection.
> We have a socket timeout on the client side to close the socket if there's
> no data available, but the kafka server doesn't close the socket at its end
> until new data becomes available and a write() call is attempted. When this
> happens, we can see the following stack trace in the logs:
>
>
> ==============================================
>
> kafka: INFO  [kafka.network.Processor] (kafka-processor-7) Closing socket
> connection to <host>.
> kafka: ERROR [kafka.network.Processor] (kafka-processor-5) Closing socket
> for <host> because of error
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
> at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
> at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:102)
> at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:53)
> at kafka.network.Processor.write(SocketServer.scala:339)
> at kafka.network.Processor.run(SocketServer.scala:216)
> at java.lang.Thread.run(Thread.java:662)
>
> ==============================================
>
> Apart from the exception in the logs, the file descriptor for the socket is
> not released until a write() call is attempted, limiting the amount of
> connections that can be established.
>
> I do realise that Kafka was designed to work with long-running processes
> that maintain a persistent connection, but I see on the mailing-list a lot
> of interest around having a REST interface in front of it or however
> consuming data in chunks with short-lived processes, and we're very
> interested in this scenario as well for one of our use-cases.
>
> So my question really is, is there a plan to have a non-blocking
> connection?
> In non-blocking mode, the Response could be immediate and could consist in
> the header alone (int32 for the response length, set to 0, followed by
> int16 for the error code, which could be set to 0 too or to a new value
> indicating that the operation would normally block).
> We can probably contribute this feature if not yet available and others
> find it useful.
>
> Another option (which I don't like as much) could be setting a request
> timeout (server-side) or a way of closing the connection gracefully from
> the client side.
>
> Thoughts?
>
> Best regards,
> --
> Lorenzo Alberton
> Chief Tech Architect
> DataSift, Inc.
>