You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2016/12/07 18:09:00 UTC

kafka git commit: KAFKA-4445; PreferredLeaderElectionCommand should query zookeeper only once per topic

Repository: kafka
Updated Branches:
  refs/heads/trunk 9e72c12e9 -> 56e5627da


KAFKA-4445; PreferredLeaderElectionCommand should query zookeeper only once per topic

Author: Dong Lin <li...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jiangjie Qin <be...@gmail.com>

Closes #2170 from lindong28/KAFAK-4445


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/56e5627d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/56e5627d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/56e5627d

Branch: refs/heads/trunk
Commit: 56e5627da5bfa01d1fa95e760a6f45949f89996a
Parents: 9e72c12
Author: Dong Lin <li...@gmail.com>
Authored: Wed Dec 7 10:08:46 2016 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Wed Dec 7 10:08:46 2016 -0800

----------------------------------------------------------------------
 .../PreferredReplicaLeaderElectionCommand.scala | 34 +++++++-------------
 1 file changed, 11 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/56e5627d/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 81014b1..960d526 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -67,7 +67,6 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkUtils, partitionsForPreferredReplicaElection)
 
       preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
-      println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection))
     } catch {
       case e: Throwable =>
         println("Failed to start preferred replica election")
@@ -107,7 +106,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
     val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
     try {
       zkUtils.createPersistentPath(zkPath, jsonData)
-      info("Created preferred replica election path with %s".format(jsonData))
+      println("Created preferred replica election path with %s".format(jsonData))
     } catch {
       case _: ZkNodeExistsException =>
         val partitionsUndergoingPreferredReplicaElection =
@@ -119,32 +118,21 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
   }
 }
 
-class PreferredReplicaLeaderElectionCommand(zkUtils: ZkUtils, partitions: scala.collection.Set[TopicAndPartition])
-  extends Logging {
+class PreferredReplicaLeaderElectionCommand(zkUtils: ZkUtils, partitionsFromUser: scala.collection.Set[TopicAndPartition]) {
   def moveLeaderToPreferredReplica() = {
     try {
-      val validPartitions = partitions.filter(p => validatePartition(zkUtils, p.topic, p.partition))
+      val topics = partitionsFromUser.map(_.topic).toSet
+      val partitionsFromZk = zkUtils.getPartitionsForTopics(topics.toSeq).flatMap{ case (topic, partitions) =>
+        partitions.map(TopicAndPartition(topic, _))
+      }.toSet
+
+      val (validPartitions, invalidPartitions) = partitionsFromUser.partition(partitionsFromZk.contains)
       PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, validPartitions)
+
+      println("Successfully started preferred replica election for partitions %s".format(validPartitions))
+      invalidPartitions.foreach(p => println("Skipping preferred replica leader election for partition %s since it doesn't exist.".format(p)))
     } catch {
       case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e)
     }
   }
-
-  def validatePartition(zkUtils: ZkUtils, topic: String, partition: Int): Boolean = {
-    // check if partition exists
-    val partitionsOpt = zkUtils.getPartitionsForTopics(List(topic)).get(topic)
-    partitionsOpt match {
-      case Some(partitions) =>
-        if(partitions.contains(partition)) {
-          true
-        } else {
-          error("Skipping preferred replica leader election for partition [%s,%d] ".format(topic, partition) +
-            "since it doesn't exist")
-          false
-        }
-      case None => error("Skipping preferred replica leader election for partition " +
-        "[%s,%d] since topic %s doesn't exist".format(topic, partition, topic))
-        false
-    }
-  }
 }