You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/12/12 01:34:50 UTC

git commit: KAFKA-1134 onControllerFailover function should be synchronized with other functions; reviewed by Neha Narkhede

Updated Branches:
  refs/heads/trunk 169a79e68 -> dd58d753c


KAFKA-1134 onControllerFailover function should be synchronized with other functions; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd58d753
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd58d753
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd58d753

Branch: refs/heads/trunk
Commit: dd58d753ce3ffb41776a6fa6322cb822f2222500
Parents: 169a79e
Author: Neha Narkhede <ne...@gmail.com>
Authored: Wed Dec 11 16:34:34 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Dec 11 16:34:34 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/controller/ControllerChannelManager.scala    | 1 +
 core/src/main/scala/kafka/controller/KafkaController.scala        | 1 +
 core/src/main/scala/kafka/controller/PartitionStateMachine.scala  | 3 ++-
 3 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd58d753/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 4c121e4..7991e42 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -90,6 +90,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
   private def removeExistingBroker(brokerId: Int) {
     try {
       brokerStateInfo(brokerId).channel.disconnect()
+      brokerStateInfo(brokerId).messageQueue.clear()
       brokerStateInfo(brokerId).requestSendThread.shutdown()
       brokerStateInfo.remove(brokerId)
     }catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd58d753/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a1f7ff4..965d0e5 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -893,6 +893,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
      */
     @throws(classOf[Exception])
     def handleNewSession() {
+      info("ZK expired; shut down all controller components and try to re-elect")
       controllerContext.controllerLock synchronized {
         Utils.unregisterMBean(KafkaController.MBeanName)
         partitionStateMachine.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd58d753/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 829163a..5859ce7 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -401,7 +401,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           val partitionsRemainingToBeAdded = partitionReplicaAssignment.filter(p =>
             !controllerContext.partitionReplicaAssignment.contains(p._1))
           info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded))
-          controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet)
+          if (partitionsRemainingToBeAdded.size > 0)
+            controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet)
         } catch {
           case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
         }