You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/05/23 01:23:35 UTC
git commit: kafka-907;
controller needs to close socket channel to brokers on exception ;
patched by Jun Rao; reviewed by Neha Narkhede
Updated Branches:
refs/heads/0.8 e93937c88 -> 32cd8994b
kafka-907; controller needs to close socket channel to brokers on exception ; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32cd8994
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32cd8994
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32cd8994
Branch: refs/heads/0.8
Commit: 32cd8994bf35b65a8053e0caea9a7710cc889df7
Parents: e93937c
Author: Jun Rao <ju...@gmail.com>
Authored: Wed May 22 16:23:21 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed May 22 16:23:21 2013 -0700
----------------------------------------------------------------------
.../controller/ControllerChannelManager.scala | 6 ++++--
1 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/32cd8994/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 0c41d1d..38b8674 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -122,6 +122,7 @@ class RequestSendThread(val controllerId: Int,
try{
lock synchronized {
+ channel.connect() // establish a socket connection if needed
channel.send(request)
receive = channel.receive()
var response: RequestOrResponse = null
@@ -142,8 +143,9 @@ class RequestSendThread(val controllerId: Int,
}
} catch {
case e =>
- // log it and let it go. Let controller shut it down.
- debug("Exception occurs", e)
+ warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e)
+ // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
+ channel.disconnect()
}
}
}