You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/11/07 19:44:02 UTC

kafka git commit: kafka-1738; Partitions for topic not created after restart from forced shutdown; patched by Jun Rao; reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/0.8.2 2ec356fa9 -> 80eb04f99


kafka-1738; Partitions for topic not created after restart from forced shutdown; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/80eb04f9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/80eb04f9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/80eb04f9

Branch: refs/heads/0.8.2
Commit: 80eb04f994963c1c6b5e3f0e8742b81ff140fc65
Parents: 2ec356f
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Nov 7 10:43:54 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Nov 7 10:43:54 2014 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/controller/ControllerChannelManager.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/80eb04f9/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ecbfa0f..eb492f0 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -130,10 +130,11 @@ class RequestSendThread(val controllerId: Int,
           // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
           try {
             channel.send(request)
+            receive = channel.receive()
             isSendSuccessful = true
           } catch {
             case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
-              error(("Controller %d epoch %d failed to send request %s to broker %s. " +
+              warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
                 "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
                 request.toString, toBroker.toString()), e)
               channel.disconnect()
@@ -143,7 +144,6 @@ class RequestSendThread(val controllerId: Int,
               Utils.swallow(Thread.sleep(300))
           }
         }
-        receive = channel.receive()
         var response: RequestOrResponse = null
         request.requestId.get match {
           case RequestKeys.LeaderAndIsrKey =>
@@ -162,7 +162,7 @@ class RequestSendThread(val controllerId: Int,
       }
     } catch {
       case e: Throwable =>
-        warn("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e)
+        error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e)
         // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
         channel.disconnect()
     }