You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2014/02/25 09:27:22 UTC

[12/19] git commit: use zk for auto rebalance

use zk for auto rebalance


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/425af9b4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/425af9b4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/425af9b4

Branch: refs/heads/trunk
Commit: 425af9b4a7afeb21191c33ba6bc3f20623a3f0b3
Parents: 55d77c6
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Fri Dec 20 11:11:20 2013 -0800
Committer: Sriram Subramanian <sr...@gmail.com>
Committed: Fri Dec 20 11:11:20 2013 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 48 ++++++++++++--------
 1 file changed, 29 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/425af9b4/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 522e6c7..74e2ea4 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -21,7 +21,7 @@ import collection.immutable.Set
 import com.yammer.metrics.core.Gauge
 import java.lang.{IllegalStateException, Object}
 import java.util.concurrent.TimeUnit
-import kafka.admin.PreferredReplicaLeaderElectionCommand
+import kafka.admin.{AdminOperationException, PreferredReplicaLeaderElectionCommand}
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common._
@@ -945,29 +945,39 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
           var imbalanceRatio: Double = 0
           var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
           controllerContext.controllerLock synchronized {
-            if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
-                controllerContext.partitionsBeingReassigned.size == 0) {
-              // do this check only if the broker is live and there are no partitions being reassigned currently
-              topicsNotInPreferredReplica =
-                topicAndPartitionsForBroker.filter {
-                  case(topicPartition, replicas) => {
-                    controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
-                  }
+            topicsNotInPreferredReplica =
+              topicAndPartitionsForBroker.filter {
+                case(topicPartition, replicas) => {
+                  controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
                 }
-              debug("topics not in preferred replica " + topicsNotInPreferredReplica)
-              val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
-              val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
-              imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
-              trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
-            }
+              }
+            debug("topics not in preferred replica " + topicsNotInPreferredReplica)
+            val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
+            val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
+            imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
+            trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
           }
           // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
           // that need to be on this broker
           if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
-            topicsNotInPreferredReplica.foreach {
-              case(topicPartition, replicas) => {
-                controllerContext.controllerLock synchronized {
-                  onPreferredReplicaElection(Set(topicPartition), false)
+            controllerContext.controllerLock synchronized {
+              // do this check only if the broker is live and there are no partitions being reassigned currently
+              // and preferred replica election is not in progress
+              if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+                  controllerContext.partitionsBeingReassigned.size == 0 &&
+                  controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
+                val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
+                val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
+                val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
+                try {
+                  ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
+                  info("Created preferred replica election path with %s".format(jsonData))
+                } catch {
+                  case e2: Throwable =>
+                    val partitionsUndergoingPreferredReplicaElection =
+                      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
+                    error("Preferred replica leader election currently in progress for " +
+                          "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
                 }
               }
             }