You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2014/02/25 09:27:22 UTC
[12/19] git commit: use zk for auto rebalance
use zk for auto rebalance
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/425af9b4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/425af9b4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/425af9b4
Branch: refs/heads/trunk
Commit: 425af9b4a7afeb21191c33ba6bc3f20623a3f0b3
Parents: 55d77c6
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Fri Dec 20 11:11:20 2013 -0800
Committer: Sriram Subramanian <sr...@gmail.com>
Committed: Fri Dec 20 11:11:20 2013 -0800
----------------------------------------------------------------------
.../kafka/controller/KafkaController.scala | 48 ++++++++++++--------
1 file changed, 29 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/425af9b4/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 522e6c7..74e2ea4 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -21,7 +21,7 @@ import collection.immutable.Set
import com.yammer.metrics.core.Gauge
import java.lang.{IllegalStateException, Object}
import java.util.concurrent.TimeUnit
-import kafka.admin.PreferredReplicaLeaderElectionCommand
+import kafka.admin.{AdminOperationException, PreferredReplicaLeaderElectionCommand}
import kafka.api._
import kafka.cluster.Broker
import kafka.common._
@@ -945,29 +945,39 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
var imbalanceRatio: Double = 0
var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
controllerContext.controllerLock synchronized {
- if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
- controllerContext.partitionsBeingReassigned.size == 0) {
- // do this check only if the broker is live and there are no partitions being reassigned currently
- topicsNotInPreferredReplica =
- topicAndPartitionsForBroker.filter {
- case(topicPartition, replicas) => {
- controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
- }
+ topicsNotInPreferredReplica =
+ topicAndPartitionsForBroker.filter {
+ case(topicPartition, replicas) => {
+ controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
}
- debug("topics not in preferred replica " + topicsNotInPreferredReplica)
- val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
- val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
- imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
- trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
- }
+ }
+ debug("topics not in preferred replica " + topicsNotInPreferredReplica)
+ val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
+ val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
+ imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
+ trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
}
// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
// that need to be on this broker
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
- topicsNotInPreferredReplica.foreach {
- case(topicPartition, replicas) => {
- controllerContext.controllerLock synchronized {
- onPreferredReplicaElection(Set(topicPartition), false)
+ controllerContext.controllerLock synchronized {
+ // do this check only if the broker is live and there are no partitions being reassigned currently
+ // and preferred replica election is not in progress
+ if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+ controllerContext.partitionsBeingReassigned.size == 0 &&
+ controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
+ val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
+ val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
+ val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
+ try {
+ ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
+ info("Created preferred replica election path with %s".format(jsonData))
+ } catch {
+ case e2: Throwable =>
+ val partitionsUndergoingPreferredReplicaElection =
+ PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
+ error("Preferred replica leader election currently in progress for " +
+ "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
}
}
}