You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2014/02/25 09:27:16 UTC

[06/19] git commit: Add auto leader rebalance support

Add auto leader rebalance support


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5bcb4183
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5bcb4183
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5bcb4183

Branch: refs/heads/trunk
Commit: 5bcb41835f58be142bb6ac7c3155dfc163a516b4
Parents: 39c5b75
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Tue Nov 19 17:03:39 2013 -0800
Committer: Sriram Subramanian <sr...@gmail.com>
Committed: Tue Nov 19 17:03:39 2013 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 67 ++++++++++++++++++--
 .../main/scala/kafka/server/KafkaConfig.scala   | 12 ++++
 2 files changed, 74 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5bcb4183/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 88792c2..d9d47c8 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -28,7 +28,7 @@ import kafka.common._
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
 import kafka.utils.ZkUtils._
-import kafka.utils.{Json, Utils, ZkUtils, Logging}
+import kafka.utils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
@@ -36,6 +36,11 @@ import java.util.concurrent.atomic.AtomicInteger
 import scala.Some
 import kafka.common.TopicAndPartition
 import org.apache.log4j.Logger
+import scala.Some
+import kafka.common.TopicAndPartition
+import kafka.controller.ReassignedPartitionsContext
+import kafka.controller.PartitionAndReplica
+import kafka.controller.LeaderIsrAndControllerEpoch
 
 class ControllerContext(val zkClient: ZkClient,
                         val zkSessionTimeout: Int,
@@ -112,6 +117,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   private val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
     config.brokerId)
+  // have a separate scheduler for the controller to be able to start and stop independently of the
+  // kafka server
+  private val controllerScheduler = new KafkaScheduler(1)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@ -250,6 +258,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       initializeAndMaybeTriggerPreferredReplicaElection()
       /* send partition leadership info to all live brokers */
       sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+      if (config.autoLeaderRebalanceEnable) {
+        info("starting the partition rebalance scheduler")
+        controllerScheduler.startup()
+        controllerScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
+          5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
+      }
     }
     else
       info("Controller has been shut down, aborting startup/failover")
@@ -456,7 +470,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     }
   }
 
-  def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
+  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], updateZk: Boolean = true) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
       controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
@@ -464,7 +478,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     } catch {
       case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
     } finally {
-      removePartitionsFromPreferredReplicaElection(partitions)
+      removePartitionsFromPreferredReplicaElection(partitions, updateZk)
     }
   }
 
@@ -493,6 +507,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       isRunning = false
       partitionStateMachine.shutdown()
       replicaStateMachine.shutdown()
+      controllerScheduler.shutdown()
       if(controllerContext.controllerChannelManager != null) {
         controllerContext.controllerChannelManager.shutdown()
         controllerContext.controllerChannelManager = null
@@ -731,7 +746,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     }
   }
 
-  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) {
+  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition], updateZK : Boolean) {
     for(partition <- partitionsToBeRemoved) {
       // check the status
       val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
@@ -742,7 +757,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
       }
     }
-    ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
+    if (updateZK)
+      ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
     controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
   }
 
@@ -898,6 +914,47 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       }
     }
   }
+
+  private def checkAndTriggerPartitionRebalance(): Unit = {
+    if (isActive()) {
+      info("checking need to trigger partition rebalance")
+      // get all the active brokers
+      var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null;
+      controllerContext.controllerLock synchronized {
+        preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy(_._2.head)
+      }
+      debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
+      // for each broker, check if a preferred replica election needs to be triggered
+      preferredReplicasForTopicsByBrokers.foreach( brokerInfo => {
+        var imbalanceRatio: Double = 0
+        var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
+        controllerContext.controllerLock synchronized {
+          val brokerIds = controllerContext.liveBrokerIds
+          if (brokerIds.contains(brokerInfo._1) &&
+              controllerContext.partitionsBeingReassigned.size == 0) {
+            // do this check only if the broker is live and there are no partitions being reassigned currently
+            topicsNotInPreferredReplica =
+              brokerInfo._2.filter(item => controllerContext.partitionLeadershipInfo(item._1).leaderAndIsr.leader != brokerInfo._1);
+            debug("topics not in preferred replica " + topicsNotInPreferredReplica)
+            val totalTopicPartitionsForBroker = brokerInfo._2.size
+            val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
+            imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
+            info("leader imbalance ratio for broker %d is %f".format(brokerInfo._1, imbalanceRatio))
+          }
+        }
+        // check ratio and if greater than desired ratio, trigger a rebalance for the topics
+        // that need to be on this broker
+        if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
+          topicsNotInPreferredReplica.foreach(topicPartition => {
+            controllerContext.controllerLock synchronized {
+              onPreferredReplicaElection(Set(topicPartition._1), false)
+            }
+          })
+        }
+      }
+      )
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bcb4183/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b324344..921f456 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -229,6 +229,18 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the purge interval (in number of requests) of the producer request purgatory */
   val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000)
 
+  /* Enables auto leader balancing. A background thread checks and triggers leader
+   * balance if required at regular intervals */
+  val autoLeaderRebalanceEnable = props.getBoolean("auto.leader.rebalance.enable", false)
+
+  /* the ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above
+   * this value per broker. The value is specified in percentage. */
+  val leaderImbalancePerBrokerPercentage = props.getInt("leader.imbalance.per.broker.percentage", 10)
+
+  /* the frequency with which the partition rebalance check is triggered by the controller */
+  val leaderImbalanceCheckIntervalSeconds = props.getInt("leader.imbalance.check.interval.seconds", 300)
+
+
   /*********** Controlled shutdown configuration ***********/
 
   /** Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens */