You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/01/03 20:10:47 UTC
[2/2] git commit: Rebased from trunk to resolve conflicts between
KAFKA-1185 and KAFKA-930
Rebased from trunk to resolve conflicts between KAFKA-1185 and KAFKA-930
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a119f532
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a119f532
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a119f532
Branch: refs/heads/trunk
Commit: a119f532c8b310122d85391efe11fd26027ef7f9
Parents: 10fa200 b23cf19
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Jan 3 11:06:04 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Jan 3 11:06:04 2014 -0800
----------------------------------------------------------------------
.../kafka/controller/KafkaController.scala | 102 +++++++++++++++++--
.../main/scala/kafka/server/KafkaConfig.scala | 12 +++
2 files changed, 108 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a119f532/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/controller/KafkaController.scala
index 2fcc36d,ca2f09b..6215cb8
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@@ -110,8 -110,11 +110,11 @@@ class KafkaController(val config : Kafk
val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
private val partitionStateMachine = new PartitionStateMachine(this)
private val replicaStateMachine = new ReplicaStateMachine(this)
- private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath,
- onControllerFailover, onControllerResignation, config.brokerId)
+ private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
- config.brokerId)
++ onControllerResignation, config.brokerId)
+ // have a separate scheduler for the controller to be able to start and stop independently of the
+ // kafka server
+ private val autoRebalanceScheduler = new KafkaScheduler(1)
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@@ -910,8 -921,77 +937,71 @@@
@throws(classOf[Exception])
def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
- onControllerResignation()
- controllerElector.elect
+ controllerContext.controllerLock synchronized {
- Utils.unregisterMBean(KafkaController.MBeanName)
- partitionStateMachine.shutdown()
- replicaStateMachine.shutdown()
- if(controllerContext.controllerChannelManager != null) {
- controllerContext.controllerChannelManager.shutdown()
- controllerContext.controllerChannelManager = null
- }
++ onControllerResignation()
+ controllerElector.elect
+ }
+ }
+ }
+
+ private def checkAndTriggerPartitionRebalance(): Unit = {
+ if (isActive()) {
+ trace("checking need to trigger partition rebalance")
+ // get all the active brokers
+ var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
+ controllerContext.controllerLock synchronized {
+ preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy {
+ case(topicAndPartition, assignedReplicas) => assignedReplicas.head
+ }
+ }
+ debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
+ // for each broker, check if a preferred replica election needs to be triggered
+ preferredReplicasForTopicsByBrokers.foreach {
+ case(leaderBroker, topicAndPartitionsForBroker) => {
+ var imbalanceRatio: Double = 0
+ var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
+ controllerContext.controllerLock synchronized {
+ topicsNotInPreferredReplica =
+ topicAndPartitionsForBroker.filter {
+ case(topicPartition, replicas) => {
+ controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
+ }
+ }
+ debug("topics not in preferred replica " + topicsNotInPreferredReplica)
+ val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
+ val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
+ imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
+ trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
+ }
+ // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
+ // that need to be on this broker
+ if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
+ controllerContext.controllerLock synchronized {
+ // do this check only if the broker is live and there are no partitions being reassigned currently
+ // and preferred replica election is not in progress
+ if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+ controllerContext.partitionsBeingReassigned.size == 0 &&
+ controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
+ val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
+ val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
+ val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
+ try {
+ ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
+ info("Created preferred replica election path with %s".format(jsonData))
+ } catch {
+ case e2: ZkNodeExistsException =>
+ val partitionsUndergoingPreferredReplicaElection =
+ PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
+ error("Preferred replica leader election currently in progress for " +
+ "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
+ case e3: Throwable =>
+ error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys))
+ }
+ }
+ }
+ }
+ }
+ }
}
}
}