You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/28 16:51:11 UTC
git commit: KAFKA-828 Preferred Replica Election does not delete the
admin path on controller failover; reviewed by Neha Narkhede and Jun Rao
Updated Branches:
refs/heads/0.8 f570cce1f -> 2fe3f9fef
KAFKA-828 Preferred Replica Election does not delete the admin path on controller failover; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2fe3f9fe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2fe3f9fe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2fe3f9fe
Branch: refs/heads/0.8
Commit: 2fe3f9fef6e641ed64aca654b607c4d398e11d25
Parents: f570cce
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Thu Mar 28 08:50:38 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Mar 28 08:51:00 2013 -0700
----------------------------------------------------------------------
.../PreferredReplicaLeaderElectionCommand.scala | 7 +--
.../scala/kafka/controller/KafkaController.scala | 31 ++++++++-------
.../kafka/controller/PartitionLeaderSelector.scala | 8 ++--
.../kafka/controller/PartitionStateMachine.scala | 3 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 17 +--------
.../test/scala/unit/kafka/admin/AdminTest.scala | 2 +-
6 files changed, 27 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 7405c5a..d5de5f3 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -54,7 +54,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
if (!options.has(jsonFileOpt))
ZkUtils.getAllPartitions(zkClient)
else
- parsePreferredReplicaJsonData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
+ parsePreferredReplicaElectionData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)
preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
@@ -69,7 +69,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}
}
- def parsePreferredReplicaJsonData(jsonString: String): immutable.Set[TopicAndPartition] = {
+ def parsePreferredReplicaElectionData(jsonString: String): immutable.Set[TopicAndPartition] = {
Json.parseFull(jsonString) match {
case Some(m) =>
m.asInstanceOf[Map[String, Any]].get("partitions") match {
@@ -101,8 +101,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
info("Created preferred replica election path with %s".format(jsonData))
} catch {
case nee: ZkNodeExistsException =>
- val partitionsUndergoingPreferredReplicaElection =
- PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(ZkUtils.readData(zkClient, zkPath)._1)
+ val partitionsUndergoingPreferredReplicaElection = parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
throw new AdministrationException("Preferred replica leader election currently in progress for " +
"%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
case e2 => throw new AdministrationException(e2.toString)
http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 229239c..9d32901 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -386,8 +386,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
- controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
- partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
+ try {
+ controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
+ partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
+ } catch {
+ case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
+ } finally {
+ removePartitionsFromPreferredReplicaElection(partitions)
+ }
}
/**
@@ -910,20 +916,15 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
*/
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
- debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election" +
- " %s".format(dataPath, data.toString))
- val partitionsForPreferredReplicaElection =
- PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(data.toString)
- val newPartitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
+ debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
+ .format(dataPath, data.toString))
+ val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
+
controllerContext.controllerLock synchronized {
- try {
- controller.onPreferredReplicaElection(newPartitions)
- } catch {
- case e => error("Error completing preferred replica leader election for partitions %s"
- .format(partitionsForPreferredReplicaElection.mkString(",")), e)
- } finally {
- controller.removePartitionsFromPreferredReplicaElection(newPartitions)
- }
+ info("These partitions are already undergoing preferred replica election: %s"
+ .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
+ val newPartitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
+ controller.onPreferredReplicaElection(newPartitions)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index d295781..7a06c24 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -18,7 +18,7 @@ package kafka.controller
import kafka.api.LeaderAndIsr
import kafka.utils.Logging
-import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
+import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
trait PartitionLeaderSelector {
@@ -125,9 +125,9 @@ with Logging {
val preferredReplica = assignedReplicas.head
// check if preferred replica is the current leader
val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
- if(currentLeader == preferredReplica) {
- throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition %s"
- .format(preferredReplica, topicAndPartition))
+ if (currentLeader == preferredReplica) {
+ throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s"
+ .format(preferredReplica, topicAndPartition))
} else {
info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
" Trigerring preferred replica leader election")
http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 654fa2e..da47ac8 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -20,7 +20,7 @@ import collection._
import collection.JavaConversions._
import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr
-import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
+import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
import kafka.utils.{Logging, ZkUtils}
import org.I0Itec.zkclient.IZkChildListener
import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -315,6 +315,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
} catch {
+ case lenne: LeaderElectionNotNeededException => // swallow
case nroe: NoReplicaOnlineException => throw nroe
case sce =>
val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 9a0e250..ce1904b 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -633,22 +633,7 @@ object ZkUtils extends Logging {
// read the partitions and their new replica list
val jsonPartitionListOpt = readDataMaybeNull(zkClient, PreferredReplicaLeaderElectionPath)._1
jsonPartitionListOpt match {
- case Some(jsonPartitionList) => parsePreferredReplicaElectionData(jsonPartitionList)
- case None => Set.empty[TopicAndPartition]
- }
- }
-
- def parsePreferredReplicaElectionData(jsonData: String):Set[TopicAndPartition] = {
- Json.parseFull(jsonData) match {
- case Some(m) =>
- val topicAndPartitions = m.asInstanceOf[Array[Map[String, String]]]
- val partitions = topicAndPartitions.map { p =>
- val topicPartitionMap = p
- val topic = topicPartitionMap.get("topic").get
- val partition = topicPartitionMap.get("partition").get.toInt
- TopicAndPartition(topic, partition)
- }
- Set.empty[TopicAndPartition] ++ partitions
+ case Some(jsonPartitionList) => PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(jsonPartitionList)
case None => Set.empty[TopicAndPartition]
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 6c80c4c..b0a0e09 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -332,7 +332,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val preferredReplicaElectionZkData = ZkUtils.readData(zkClient,
ZkUtils.PreferredReplicaLeaderElectionPath)._1
val partitionsUndergoingPreferredReplicaElection =
- PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(preferredReplicaElectionZkData)
+ PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(preferredReplicaElectionZkData)
assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
partitionsUndergoingPreferredReplicaElection)
}