You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/11/07 19:44:02 UTC
kafka git commit: kafka-1738;
Partitions for topic not created after restart from forced shutdown;
patched by Jun Rao; reviewed by Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/0.8.2 2ec356fa9 -> 80eb04f99
kafka-1738; Partitions for topic not created after restart from forced shutdown; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/80eb04f9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/80eb04f9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/80eb04f9
Branch: refs/heads/0.8.2
Commit: 80eb04f994963c1c6b5e3f0e8742b81ff140fc65
Parents: 2ec356f
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Nov 7 10:43:54 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Nov 7 10:43:54 2014 -0800
----------------------------------------------------------------------
.../main/scala/kafka/controller/ControllerChannelManager.scala | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/80eb04f9/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ecbfa0f..eb492f0 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -130,10 +130,11 @@ class RequestSendThread(val controllerId: Int,
// removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
try {
channel.send(request)
+ receive = channel.receive()
isSendSuccessful = true
} catch {
case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
- error(("Controller %d epoch %d failed to send request %s to broker %s. " +
+ warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
"Reconnecting to broker.").format(controllerId, controllerContext.epoch,
request.toString, toBroker.toString()), e)
channel.disconnect()
@@ -143,7 +144,6 @@ class RequestSendThread(val controllerId: Int,
Utils.swallow(Thread.sleep(300))
}
}
- receive = channel.receive()
var response: RequestOrResponse = null
request.requestId.get match {
case RequestKeys.LeaderAndIsrKey =>
@@ -162,7 +162,7 @@ class RequestSendThread(val controllerId: Int,
}
} catch {
case e: Throwable =>
- warn("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e)
+ error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e)
// If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
channel.disconnect()
}