You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2014/02/25 09:27:16 UTC
[06/19] git commit: Add auto leader rebalance support
Add auto leader rebalance support
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5bcb4183
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5bcb4183
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5bcb4183
Branch: refs/heads/trunk
Commit: 5bcb41835f58be142bb6ac7c3155dfc163a516b4
Parents: 39c5b75
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Tue Nov 19 17:03:39 2013 -0800
Committer: Sriram Subramanian <sr...@gmail.com>
Committed: Tue Nov 19 17:03:39 2013 -0800
----------------------------------------------------------------------
.../kafka/controller/KafkaController.scala | 67 ++++++++++++++++++--
.../main/scala/kafka/server/KafkaConfig.scala | 12 ++++
2 files changed, 74 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bcb4183/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 88792c2..d9d47c8 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -28,7 +28,7 @@ import kafka.common._
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
import kafka.utils.ZkUtils._
-import kafka.utils.{Json, Utils, ZkUtils, Logging}
+import kafka.utils._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
@@ -36,6 +36,11 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.Some
import kafka.common.TopicAndPartition
import org.apache.log4j.Logger
+import scala.Some
+import kafka.common.TopicAndPartition
+import kafka.controller.ReassignedPartitionsContext
+import kafka.controller.PartitionAndReplica
+import kafka.controller.LeaderIsrAndControllerEpoch
class ControllerContext(val zkClient: ZkClient,
val zkSessionTimeout: Int,
@@ -112,6 +117,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
private val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
config.brokerId)
+ // have a separate scheduler for the controller to be able to start and stop independently of the
+ // kafka server
+ private val controllerScheduler = new KafkaScheduler(1)
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@ -250,6 +258,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
initializeAndMaybeTriggerPreferredReplicaElection()
/* send partition leadership info to all live brokers */
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+ if (config.autoLeaderRebalanceEnable) {
+ info("starting the partition rebalance scheduler")
+ controllerScheduler.startup()
+ controllerScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
+ 5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
+ }
}
else
info("Controller has been shut down, aborting startup/failover")
@@ -456,7 +470,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
}
- def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
+ def onPreferredReplicaElection(partitions: Set[TopicAndPartition], updateZk: Boolean = true) {
info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
try {
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
@@ -464,7 +478,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
} catch {
case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
} finally {
- removePartitionsFromPreferredReplicaElection(partitions)
+ removePartitionsFromPreferredReplicaElection(partitions, updateZk)
}
}
@@ -493,6 +507,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
isRunning = false
partitionStateMachine.shutdown()
replicaStateMachine.shutdown()
+ controllerScheduler.shutdown()
if(controllerContext.controllerChannelManager != null) {
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager = null
@@ -731,7 +746,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
}
- def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) {
+ def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition], updateZK : Boolean) {
for(partition <- partitionsToBeRemoved) {
// check the status
val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
@@ -742,7 +757,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
}
}
- ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
+ if (updateZK)
+ ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
}
@@ -898,6 +914,47 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
}
}
+
+ private def checkAndTriggerPartitionRebalance(): Unit = {
+ if (isActive()) {
+ info("checking need to trigger partition rebalance")
+ // get all the active brokers
+ var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null;
+ controllerContext.controllerLock synchronized {
+ preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy(_._2.head)
+ }
+ debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
+ // for each broker, check if a preferred replica election needs to be triggered
+ preferredReplicasForTopicsByBrokers.foreach( brokerInfo => {
+ var imbalanceRatio: Double = 0
+ var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
+ controllerContext.controllerLock synchronized {
+ val brokerIds = controllerContext.liveBrokerIds
+ if (brokerIds.contains(brokerInfo._1) &&
+ controllerContext.partitionsBeingReassigned.size == 0) {
+ // do this check only if the broker is live and there are no partitions being reassigned currently
+ topicsNotInPreferredReplica =
+ brokerInfo._2.filter(item => controllerContext.partitionLeadershipInfo(item._1).leaderAndIsr.leader != brokerInfo._1);
+ debug("topics not in preferred replica " + topicsNotInPreferredReplica)
+ val totalTopicPartitionsForBroker = brokerInfo._2.size
+ val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
+ imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
+ info("leader imbalance ratio for broker %d is %f".format(brokerInfo._1, imbalanceRatio))
+ }
+ }
+ // check ratio and if greater than desired ratio, trigger a rebalance for the topics
+ // that need to be on this broker
+ if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
+ topicsNotInPreferredReplica.foreach(topicPartition => {
+ controllerContext.controllerLock synchronized {
+ onPreferredReplicaElection(Set(topicPartition._1), false)
+ }
+ })
+ }
+ }
+ )
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bcb4183/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b324344..921f456 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -229,6 +229,18 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the purge interval (in number of requests) of the producer request purgatory */
val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000)
+ /* Enables auto leader balancing. A background thread checks and triggers leader
+ * balance if required at regular intervals */
+ val autoLeaderRebalanceEnable = props.getBoolean("auto.leader.rebalance.enable", false)
+
+ /* the ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above
+ * this value per broker. The value is specified in percentage. */
+ val leaderImbalancePerBrokerPercentage = props.getInt("leader.imbalance.per.broker.percentage", 10)
+
+ /* the frequency with which the partition rebalance check is triggered by the controller */
+ val leaderImbalanceCheckIntervalSeconds = props.getInt("leader.imbalance.check.interval.seconds", 300)
+
+
/*********** Controlled shutdown configuration ***********/
/** Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens */