You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/02/28 17:02:47 UTC

git commit: kafka-1041; Number of file handles increases indefinitely in producer if broker host is unresolvable; patched by Rajasekar Elango; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk f1a53b972 -> 8cdb234ad


kafka-1041; Number of file handles increases indefinitely in producer if broker host is unresolvable; patched by Rajasekar Elango; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8cdb234a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8cdb234a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8cdb234a

Branch: refs/heads/trunk
Commit: 8cdb234ad81ec324f15c9c1f8e484861926076f9
Parents: f1a53b9
Author: Rajasekar Elango <e....@gmail.com>
Authored: Fri Feb 28 08:02:42 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Feb 28 08:02:42 2014 -0800

----------------------------------------------------------------------
 .../scala/kafka/network/BlockingChannel.scala   | 48 +++++++++++---------
 1 file changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8cdb234a/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index b894fa6..eb7bb14 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -45,28 +45,32 @@ class BlockingChannel( val host: String,
   
   def connect() = lock synchronized  {
     if(!connected) {
-      channel = SocketChannel.open()
-      if(readBufferSize > 0)
-        channel.socket.setReceiveBufferSize(readBufferSize)
-      if(writeBufferSize > 0)
-        channel.socket.setSendBufferSize(writeBufferSize)
-      channel.configureBlocking(true)
-      channel.socket.setSoTimeout(readTimeoutMs)
-      channel.socket.setKeepAlive(true)
-      channel.socket.setTcpNoDelay(true)
-      channel.connect(new InetSocketAddress(host, port))
+      try {
+        channel = SocketChannel.open()
+        if(readBufferSize > 0)
+          channel.socket.setReceiveBufferSize(readBufferSize)
+        if(writeBufferSize > 0)
+          channel.socket.setSendBufferSize(writeBufferSize)
+        channel.configureBlocking(true)
+        channel.socket.setSoTimeout(readTimeoutMs)
+        channel.socket.setKeepAlive(true)
+        channel.socket.setTcpNoDelay(true)
+        channel.connect(new InetSocketAddress(host, port))
 
-      writeChannel = channel
-      readChannel = Channels.newChannel(channel.socket().getInputStream)
-      connected = true
-      // settings may not match what we requested above
-      val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d)."
-      debug(msg.format(channel.socket.getSoTimeout,
-                       readTimeoutMs,
-                       channel.socket.getReceiveBufferSize, 
-                       readBufferSize,
-                       channel.socket.getSendBufferSize,
-                       writeBufferSize))
+        writeChannel = channel
+        readChannel = Channels.newChannel(channel.socket().getInputStream)
+        connected = true
+        // settings may not match what we requested above
+        val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d)."
+        debug(msg.format(channel.socket.getSoTimeout,
+                         readTimeoutMs,
+                         channel.socket.getReceiveBufferSize, 
+                         readBufferSize,
+                         channel.socket.getSendBufferSize,
+                         writeBufferSize))
+      } catch {
+        case e: Throwable => disconnect()
+      }
     }
   }
   
@@ -106,4 +110,4 @@ class BlockingChannel( val host: String,
     response
   }
 
-}
\ No newline at end of file
+}