You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/02/28 17:02:47 UTC
git commit: kafka-1041;
Number of file handles increases indefinitely in producer if broker
host is unresolvable; patched by Rajasekar Elango; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk f1a53b972 -> 8cdb234ad
kafka-1041; Number of file handles increases indefinitely in producer if broker host is unresolvable; patched by Rajasekar Elango; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8cdb234a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8cdb234a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8cdb234a
Branch: refs/heads/trunk
Commit: 8cdb234ad81ec324f15c9c1f8e484861926076f9
Parents: f1a53b9
Author: Rajasekar Elango <e....@gmail.com>
Authored: Fri Feb 28 08:02:42 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Feb 28 08:02:42 2014 -0800
----------------------------------------------------------------------
.../scala/kafka/network/BlockingChannel.scala | 48 +++++++++++---------
1 file changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8cdb234a/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index b894fa6..eb7bb14 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -45,28 +45,32 @@ class BlockingChannel( val host: String,
def connect() = lock synchronized {
if(!connected) {
- channel = SocketChannel.open()
- if(readBufferSize > 0)
- channel.socket.setReceiveBufferSize(readBufferSize)
- if(writeBufferSize > 0)
- channel.socket.setSendBufferSize(writeBufferSize)
- channel.configureBlocking(true)
- channel.socket.setSoTimeout(readTimeoutMs)
- channel.socket.setKeepAlive(true)
- channel.socket.setTcpNoDelay(true)
- channel.connect(new InetSocketAddress(host, port))
+ try {
+ channel = SocketChannel.open()
+ if(readBufferSize > 0)
+ channel.socket.setReceiveBufferSize(readBufferSize)
+ if(writeBufferSize > 0)
+ channel.socket.setSendBufferSize(writeBufferSize)
+ channel.configureBlocking(true)
+ channel.socket.setSoTimeout(readTimeoutMs)
+ channel.socket.setKeepAlive(true)
+ channel.socket.setTcpNoDelay(true)
+ channel.connect(new InetSocketAddress(host, port))
- writeChannel = channel
- readChannel = Channels.newChannel(channel.socket().getInputStream)
- connected = true
- // settings may not match what we requested above
- val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d)."
- debug(msg.format(channel.socket.getSoTimeout,
- readTimeoutMs,
- channel.socket.getReceiveBufferSize,
- readBufferSize,
- channel.socket.getSendBufferSize,
- writeBufferSize))
+ writeChannel = channel
+ readChannel = Channels.newChannel(channel.socket().getInputStream)
+ connected = true
+ // settings may not match what we requested above
+ val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d)."
+ debug(msg.format(channel.socket.getSoTimeout,
+ readTimeoutMs,
+ channel.socket.getReceiveBufferSize,
+ readBufferSize,
+ channel.socket.getSendBufferSize,
+ writeBufferSize))
+ } catch {
+ case e: Throwable => disconnect()
+ }
}
}
@@ -106,4 +110,4 @@ class BlockingChannel( val host: String,
response
}
-}
\ No newline at end of file
+}