You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/28 16:51:11 UTC

git commit: KAFKA-828 Preferred Replica Election does not delete the admin path on controller failover; reviewed by Neha Narkhede and Jun Rao

Updated Branches:
  refs/heads/0.8 f570cce1f -> 2fe3f9fef


KAFKA-828 Preferred Replica Election does not delete the admin path on controller failover; reviewed by Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2fe3f9fe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2fe3f9fe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2fe3f9fe

Branch: refs/heads/0.8
Commit: 2fe3f9fef6e641ed64aca654b607c4d398e11d25
Parents: f570cce
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Thu Mar 28 08:50:38 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Mar 28 08:51:00 2013 -0700

----------------------------------------------------------------------
 .../PreferredReplicaLeaderElectionCommand.scala    |    7 +--
 .../scala/kafka/controller/KafkaController.scala   |   31 ++++++++-------
 .../kafka/controller/PartitionLeaderSelector.scala |    8 ++--
 .../kafka/controller/PartitionStateMachine.scala   |    3 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |   17 +--------
 .../test/scala/unit/kafka/admin/AdminTest.scala    |    2 +-
 6 files changed, 27 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 7405c5a..d5de5f3 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -54,7 +54,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
         if (!options.has(jsonFileOpt))
           ZkUtils.getAllPartitions(zkClient)
         else
-          parsePreferredReplicaJsonData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
+          parsePreferredReplicaElectionData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
       val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)
 
       preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
@@ -69,7 +69,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
     }
   }
 
-  def parsePreferredReplicaJsonData(jsonString: String): immutable.Set[TopicAndPartition] = {
+  def parsePreferredReplicaElectionData(jsonString: String): immutable.Set[TopicAndPartition] = {
     Json.parseFull(jsonString) match {
       case Some(m) =>
         m.asInstanceOf[Map[String, Any]].get("partitions") match {
@@ -101,8 +101,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       info("Created preferred replica election path with %s".format(jsonData))
     } catch {
       case nee: ZkNodeExistsException =>
-        val partitionsUndergoingPreferredReplicaElection =
-          PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(ZkUtils.readData(zkClient, zkPath)._1)
+        val partitionsUndergoingPreferredReplicaElection = parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
         throw new AdministrationException("Preferred replica leader election currently in progress for " +
           "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
       case e2 => throw new AdministrationException(e2.toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 229239c..9d32901 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -386,8 +386,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
 
   def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
-    controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
-    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
+    try {
+      controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
+      partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
+    } catch {
+      case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
+    } finally {
+      removePartitionsFromPreferredReplicaElection(partitions)
+    }
   }
 
   /**
@@ -910,20 +916,15 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
    */
   @throws(classOf[Exception])
   def handleDataChange(dataPath: String, data: Object) {
-    debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election" +
-      " %s".format(dataPath, data.toString))
-    val partitionsForPreferredReplicaElection =
-      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(data.toString)
-    val newPartitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
+    debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
+            .format(dataPath, data.toString))
+    val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
+
     controllerContext.controllerLock synchronized {
-      try {
-        controller.onPreferredReplicaElection(newPartitions)
-      } catch {
-        case e => error("Error completing preferred replica leader election for partitions %s"
-          .format(partitionsForPreferredReplicaElection.mkString(",")), e)
-      } finally {
-        controller.removePartitionsFromPreferredReplicaElection(newPartitions)
-      }
+      info("These partitions are already undergoing preferred replica election: %s"
+             .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
+      val newPartitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
+      controller.onPreferredReplicaElection(newPartitions)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index d295781..7a06c24 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -18,7 +18,7 @@ package kafka.controller
 
 import kafka.api.LeaderAndIsr
 import kafka.utils.Logging
-import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
+import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
 
 trait PartitionLeaderSelector {
 
@@ -125,9 +125,9 @@ with Logging {
     val preferredReplica = assignedReplicas.head
     // check if preferred replica is the current leader
     val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
-    if(currentLeader == preferredReplica) {
-      throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition %s"
-        .format(preferredReplica, topicAndPartition))
+    if (currentLeader == preferredReplica) {
+      throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s"
+                                                   .format(preferredReplica, topicAndPartition))
     } else {
       info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
         " Trigerring preferred replica leader election")

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 654fa2e..da47ac8 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -20,7 +20,7 @@ import collection._
 import collection.JavaConversions._
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
-import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
+import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
 import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.IZkChildListener
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -315,6 +315,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
         newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
     } catch {
+      case lenne: LeaderElectionNotNeededException => // swallow
       case nroe: NoReplicaOnlineException => throw nroe
       case sce =>
         val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 9a0e250..ce1904b 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -633,22 +633,7 @@ object ZkUtils extends Logging {
     // read the partitions and their new replica list
     val jsonPartitionListOpt = readDataMaybeNull(zkClient, PreferredReplicaLeaderElectionPath)._1
     jsonPartitionListOpt match {
-      case Some(jsonPartitionList) => parsePreferredReplicaElectionData(jsonPartitionList)
-      case None => Set.empty[TopicAndPartition]
-    }
-  }
-
-  def parsePreferredReplicaElectionData(jsonData: String):Set[TopicAndPartition] = {
-    Json.parseFull(jsonData) match {
-      case Some(m) =>
-        val topicAndPartitions = m.asInstanceOf[Array[Map[String, String]]]
-        val partitions = topicAndPartitions.map { p =>
-          val topicPartitionMap = p
-          val topic = topicPartitionMap.get("topic").get
-          val partition = topicPartitionMap.get("partition").get.toInt
-          TopicAndPartition(topic, partition)
-        }
-        Set.empty[TopicAndPartition] ++ partitions
+      case Some(jsonPartitionList) => PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(jsonPartitionList)
       case None => Set.empty[TopicAndPartition]
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fe3f9fe/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 6c80c4c..b0a0e09 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -332,7 +332,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val preferredReplicaElectionZkData = ZkUtils.readData(zkClient,
         ZkUtils.PreferredReplicaLeaderElectionPath)._1
     val partitionsUndergoingPreferredReplicaElection =
-      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(preferredReplicaElectionZkData)
+      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(preferredReplicaElectionZkData)
     assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
       partitionsUndergoingPreferredReplicaElection)
   }