You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/01/03 20:10:46 UTC
[1/2] git commit: KAFKA-1185 Improve leader elector module to have a
resign API; reviewed by Guozhang Wang and Jun Rao
Updated Branches:
refs/heads/trunk b23cf1968 -> a119f532c
KAFKA-1185 Improve leader elector module to have a resign API; reviewed by Guozhang Wang and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10fa2000
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10fa2000
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10fa2000
Branch: refs/heads/trunk
Commit: 10fa20001dd22a2cfc7da065d75d7cf6c0009b42
Parents: b5d1687
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Dec 20 14:39:03 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Dec 20 14:39:03 2013 -0800
----------------------------------------------------------------------
.../kafka/controller/KafkaController.scala | 32 ++++++++++-------
.../kafka/server/ZookeeperLeaderElector.scala | 36 +++++++++++---------
2 files changed, 40 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/10fa2000/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 965d0e5..2fcc36d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -110,8 +110,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
private val partitionStateMachine = new PartitionStateMachine(this)
private val replicaStateMachine = new ReplicaStateMachine(this)
- private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
- config.brokerId)
+ private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath,
+ onControllerFailover, onControllerResignation, config.brokerId)
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@ -256,6 +256,22 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
/**
+ * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is
+ * required to clean up internal controller data structures
+ */
+ def onControllerResignation() {
+ controllerContext.controllerLock synchronized {
+ Utils.unregisterMBean(KafkaController.MBeanName)
+ partitionStateMachine.shutdown()
+ replicaStateMachine.shutdown()
+ if(controllerContext.controllerChannelManager != null) {
+ controllerContext.controllerChannelManager.shutdown()
+ controllerContext.controllerChannelManager = null
+ }
+ }
+ }
+
+ /**
* Returns true if this broker is the current controller.
*/
def isActive(): Boolean = {
@@ -894,16 +910,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
@throws(classOf[Exception])
def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
- controllerContext.controllerLock synchronized {
- Utils.unregisterMBean(KafkaController.MBeanName)
- partitionStateMachine.shutdown()
- replicaStateMachine.shutdown()
- if(controllerContext.controllerChannelManager != null) {
- controllerContext.controllerChannelManager.shutdown()
- controllerContext.controllerChannelManager = null
- }
- controllerElector.elect
- }
+ onControllerResignation()
+ controllerElector.elect
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/10fa2000/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index cc6f1eb..b189619 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -30,7 +30,10 @@ import kafka.common.KafkaException
* leader is dead, this class will handle automatic re-election and if it succeeds, it invokes the leader state change
* callback
*/
-class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: String, onBecomingLeader: () => Unit,
+class ZookeeperLeaderElector(controllerContext: ControllerContext,
+ electionPath: String,
+ onBecomingLeader: () => Unit,
+ onResigningAsLeader: () => Unit,
brokerId: Int)
extends LeaderElector with Logging {
var leaderId = -1
@@ -58,23 +61,22 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
info(brokerId + " successfully elected as leader")
leaderId = brokerId
onBecomingLeader()
- } catch {
- case e: ZkNodeExistsException =>
- // If someone else has written the path, then
- leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
- case Some(controller) => KafkaController.parseControllerId(controller)
- case None => {
- warn("A leader has been elected but just resigned, this will result in another round of election")
- -1
- }
+ } catch {
+ case e: ZkNodeExistsException =>
+ // If someone else has written the path, then
+ leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+ case Some(controller) => KafkaController.parseControllerId(controller)
+ case None => {
+ warn("A leader has been elected but just resigned, this will result in another round of election")
+ -1
}
- if (leaderId != -1)
- debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
- case e2: Throwable =>
- error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
- leaderId = -1
+ }
+ if (leaderId != -1)
+ debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
+ case e2: Throwable =>
+ error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
+ resign()
}
-
amILeader
}
@@ -116,6 +118,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
controllerContext.controllerLock synchronized {
debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
.format(brokerId, dataPath))
+ if(amILeader)
+ onResigningAsLeader()
elect
}
}
[2/2] git commit: Rebased from trunk to resolve conflicts between
KAFKA-1185 and KAFKA-930
Posted by ne...@apache.org.
Rebased from trunk to resolve conflicts between KAFKA-1185 and KAFKA-930
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a119f532
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a119f532
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a119f532
Branch: refs/heads/trunk
Commit: a119f532c8b310122d85391efe11fd26027ef7f9
Parents: 10fa200 b23cf19
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Jan 3 11:06:04 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Jan 3 11:06:04 2014 -0800
----------------------------------------------------------------------
.../kafka/controller/KafkaController.scala | 102 +++++++++++++++++--
.../main/scala/kafka/server/KafkaConfig.scala | 12 +++
2 files changed, 108 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a119f532/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/controller/KafkaController.scala
index 2fcc36d,ca2f09b..6215cb8
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@@ -110,8 -110,11 +110,11 @@@ class KafkaController(val config : Kafk
val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
private val partitionStateMachine = new PartitionStateMachine(this)
private val replicaStateMachine = new ReplicaStateMachine(this)
- private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath,
- onControllerFailover, onControllerResignation, config.brokerId)
+ private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
- config.brokerId)
++ onControllerResignation, config.brokerId)
+ // have a separate scheduler for the controller to be able to start and stop independently of the
+ // kafka server
+ private val autoRebalanceScheduler = new KafkaScheduler(1)
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@@ -910,8 -921,77 +937,71 @@@
@throws(classOf[Exception])
def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
- onControllerResignation()
- controllerElector.elect
+ controllerContext.controllerLock synchronized {
- Utils.unregisterMBean(KafkaController.MBeanName)
- partitionStateMachine.shutdown()
- replicaStateMachine.shutdown()
- if(controllerContext.controllerChannelManager != null) {
- controllerContext.controllerChannelManager.shutdown()
- controllerContext.controllerChannelManager = null
- }
++ onControllerResignation()
+ controllerElector.elect
+ }
+ }
+ }
+
+ private def checkAndTriggerPartitionRebalance(): Unit = {
+ if (isActive()) {
+ trace("checking need to trigger partition rebalance")
+ // get all the active brokers
+ var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
+ controllerContext.controllerLock synchronized {
+ preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy {
+ case(topicAndPartition, assignedReplicas) => assignedReplicas.head
+ }
+ }
+ debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
+ // for each broker, check if a preferred replica election needs to be triggered
+ preferredReplicasForTopicsByBrokers.foreach {
+ case(leaderBroker, topicAndPartitionsForBroker) => {
+ var imbalanceRatio: Double = 0
+ var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
+ controllerContext.controllerLock synchronized {
+ topicsNotInPreferredReplica =
+ topicAndPartitionsForBroker.filter {
+ case(topicPartition, replicas) => {
+ controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
+ }
+ }
+ debug("topics not in preferred replica " + topicsNotInPreferredReplica)
+ val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
+ val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
+ imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
+ trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
+ }
+ // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
+ // that need to be on this broker
+ if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
+ controllerContext.controllerLock synchronized {
+ // do this check only if the broker is live and there are no partitions being reassigned currently
+ // and preferred replica election is not in progress
+ if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+ controllerContext.partitionsBeingReassigned.size == 0 &&
+ controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
+ val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
+ val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
+ val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
+ try {
+ ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
+ info("Created preferred replica election path with %s".format(jsonData))
+ } catch {
+ case e2: ZkNodeExistsException =>
+ val partitionsUndergoingPreferredReplicaElection =
+ PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
+ error("Preferred replica leader election currently in progress for " +
+ "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
+ case e3: Throwable =>
+ error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys))
+ }
+ }
+ }
+ }
+ }
+ }
}
}
}