You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/12/12 01:34:50 UTC
git commit: KAFKA-1134 onControllerFailover function should be
synchronized with other functions; reviewed by Neha Narkhede
Updated Branches:
refs/heads/trunk 169a79e68 -> dd58d753c
KAFKA-1134 onControllerFailover function should be synchronized with other functions; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd58d753
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd58d753
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd58d753
Branch: refs/heads/trunk
Commit: dd58d753ce3ffb41776a6fa6322cb822f2222500
Parents: 169a79e
Author: Neha Narkhede <ne...@gmail.com>
Authored: Wed Dec 11 16:34:34 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Dec 11 16:34:34 2013 -0800
----------------------------------------------------------------------
.../main/scala/kafka/controller/ControllerChannelManager.scala | 1 +
core/src/main/scala/kafka/controller/KafkaController.scala | 1 +
core/src/main/scala/kafka/controller/PartitionStateMachine.scala | 3 ++-
3 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd58d753/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 4c121e4..7991e42 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -90,6 +90,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
private def removeExistingBroker(brokerId: Int) {
try {
brokerStateInfo(brokerId).channel.disconnect()
+ brokerStateInfo(brokerId).messageQueue.clear()
brokerStateInfo(brokerId).requestSendThread.shutdown()
brokerStateInfo.remove(brokerId)
}catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd58d753/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a1f7ff4..965d0e5 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -893,6 +893,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
*/
@throws(classOf[Exception])
def handleNewSession() {
+ info("ZK expired; shut down all controller components and try to re-elect")
controllerContext.controllerLock synchronized {
Utils.unregisterMBean(KafkaController.MBeanName)
partitionStateMachine.shutdown()
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd58d753/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 829163a..5859ce7 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -401,7 +401,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
val partitionsRemainingToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))
info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded))
- controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet)
+ if (partitionsRemainingToBeAdded.size > 0)
+ controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet)
} catch {
case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
}