You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/05/23 01:23:35 UTC

git commit: kafka-907; controller needs to close socket channel to brokers on exception ; patched by Jun Rao; reviewed by Neha Narkhede

Updated Branches:
  refs/heads/0.8 e93937c88 -> 32cd8994b


kafka-907; controller needs to close socket channel to brokers on exception ; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32cd8994
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32cd8994
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32cd8994

Branch: refs/heads/0.8
Commit: 32cd8994bf35b65a8053e0caea9a7710cc889df7
Parents: e93937c
Author: Jun Rao <ju...@gmail.com>
Authored: Wed May 22 16:23:21 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed May 22 16:23:21 2013 -0700

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala      |    6 ++++--
 1 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/32cd8994/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 0c41d1d..38b8674 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -122,6 +122,7 @@ class RequestSendThread(val controllerId: Int,
 
     try{
       lock synchronized {
+        channel.connect() // establish a socket connection if needed
         channel.send(request)
         receive = channel.receive()
         var response: RequestOrResponse = null
@@ -142,8 +143,9 @@ class RequestSendThread(val controllerId: Int,
       }
     } catch {
       case e =>
-        // log it and let it go. Let controller shut it down.
-        debug("Exception occurs", e)
+        warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e)
+        // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
+        channel.disconnect()
     }
   }
 }