You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2016/12/07 18:09:00 UTC
kafka git commit: KAFKA-4445;
PreferredLeaderElectionCommand should query zookeeper only once per
topic
Repository: kafka
Updated Branches:
refs/heads/trunk 9e72c12e9 -> 56e5627da
KAFKA-4445; PreferredLeaderElectionCommand should query zookeeper only once per topic
Author: Dong Lin <li...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jiangjie Qin <be...@gmail.com>
Closes #2170 from lindong28/KAFAK-4445
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/56e5627d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/56e5627d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/56e5627d
Branch: refs/heads/trunk
Commit: 56e5627da5bfa01d1fa95e760a6f45949f89996a
Parents: 9e72c12
Author: Dong Lin <li...@gmail.com>
Authored: Wed Dec 7 10:08:46 2016 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Wed Dec 7 10:08:46 2016 -0800
----------------------------------------------------------------------
.../PreferredReplicaLeaderElectionCommand.scala | 34 +++++++-------------
1 file changed, 11 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/56e5627d/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 81014b1..960d526 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -67,7 +67,6 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkUtils, partitionsForPreferredReplicaElection)
preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
- println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection))
} catch {
case e: Throwable =>
println("Failed to start preferred replica election")
@@ -107,7 +106,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
try {
zkUtils.createPersistentPath(zkPath, jsonData)
- info("Created preferred replica election path with %s".format(jsonData))
+ println("Created preferred replica election path with %s".format(jsonData))
} catch {
case _: ZkNodeExistsException =>
val partitionsUndergoingPreferredReplicaElection =
@@ -119,32 +118,21 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}
}
-class PreferredReplicaLeaderElectionCommand(zkUtils: ZkUtils, partitions: scala.collection.Set[TopicAndPartition])
- extends Logging {
+class PreferredReplicaLeaderElectionCommand(zkUtils: ZkUtils, partitionsFromUser: scala.collection.Set[TopicAndPartition]) {
def moveLeaderToPreferredReplica() = {
try {
- val validPartitions = partitions.filter(p => validatePartition(zkUtils, p.topic, p.partition))
+ val topics = partitionsFromUser.map(_.topic).toSet
+ val partitionsFromZk = zkUtils.getPartitionsForTopics(topics.toSeq).flatMap{ case (topic, partitions) =>
+ partitions.map(TopicAndPartition(topic, _))
+ }.toSet
+
+ val (validPartitions, invalidPartitions) = partitionsFromUser.partition(partitionsFromZk.contains)
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, validPartitions)
+
+ println("Successfully started preferred replica election for partitions %s".format(validPartitions))
+ invalidPartitions.foreach(p => println("Skipping preferred replica leader election for partition %s since it doesn't exist.".format(p)))
} catch {
case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e)
}
}
-
- def validatePartition(zkUtils: ZkUtils, topic: String, partition: Int): Boolean = {
- // check if partition exists
- val partitionsOpt = zkUtils.getPartitionsForTopics(List(topic)).get(topic)
- partitionsOpt match {
- case Some(partitions) =>
- if(partitions.contains(partition)) {
- true
- } else {
- error("Skipping preferred replica leader election for partition [%s,%d] ".format(topic, partition) +
- "since it doesn't exist")
- false
- }
- case None => error("Skipping preferred replica leader election for partition " +
- "[%s,%d] since topic %s doesn't exist".format(topic, partition, topic))
- false
- }
- }
}