You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2014/02/25 09:44:21 UTC
git commit: auto rebalance last commit
Repository: kafka
Updated Branches:
refs/heads/0.8.1 a2745382d -> b5971264f
auto rebalance last commit
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b5971264
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b5971264
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b5971264
Branch: refs/heads/0.8.1
Commit: b5971264f29c6646bc543b47c58786f6322f0bd0
Parents: a274538
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Tue Feb 25 00:36:48 2014 -0800
Committer: Sriram Subramanian <sr...@gmail.com>
Committed: Tue Feb 25 00:36:48 2014 -0800
----------------------------------------------------------------------
.../kafka/controller/KafkaController.scala | 44 +++++++++-----------
1 file changed, 20 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b5971264/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 00a1f98..f12ffc2 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -603,7 +603,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
}
- def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
+ def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = true) {
info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
try {
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
@@ -612,7 +612,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
} catch {
case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
} finally {
- removePartitionsFromPreferredReplicaElection(partitions)
+ removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
}
}
@@ -914,7 +914,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
}
- def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) {
+ def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
+ isTriggeredByAutoRebalance : Boolean) {
for(partition <- partitionsToBeRemoved) {
// check the status
val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
@@ -925,7 +926,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
}
}
- ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
+ if (!isTriggeredByAutoRebalance)
+ ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
}
@@ -1090,6 +1092,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
topicsNotInPreferredReplica =
topicAndPartitionsForBroker.filter {
case(topicPartition, replicas) => {
+ controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
}
}
@@ -1102,26 +1105,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
// that need to be on this broker
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
- inLock(controllerContext.controllerLock) {
- // do this check only if the broker is live and there are no partitions being reassigned currently
- // and preferred replica election is not in progress
- if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
- controllerContext.partitionsBeingReassigned.size == 0 &&
- controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
- val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
- val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
- val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
- try {
- ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
- info("Created preferred replica election path with %s".format(jsonData))
- } catch {
- case e2: ZkNodeExistsException =>
- val partitionsUndergoingPreferredReplicaElection =
- PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
- error("Preferred replica leader election currently in progress for " +
- "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
- case e3: Throwable =>
- error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys))
+ topicsNotInPreferredReplica.foreach {
+ case(topicPartition, replicas) => {
+ inLock(controllerContext.controllerLock) {
+ // do this check only if the broker is live and there are no partitions being reassigned currently
+ // and preferred replica election is not in progress
+ if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+ controllerContext.partitionsBeingReassigned.size == 0 &&
+ controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
+ !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+ !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
+ controllerContext.allTopics.contains(topicPartition.topic)) {
+ onPreferredReplicaElection(Set(topicPartition), false)
+ }
}
}
}