You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/04/08 18:48:20 UTC
git commit: kafka-850;
add an option to show under replicated partitions in list topic
command; patched by Jun Rao; reviewed by Neha Narkhede
Updated Branches:
refs/heads/0.8 756be5363 -> d4a70eb9b
kafka-850; add an option to show under replicated partitions in list topic command; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d4a70eb9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d4a70eb9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d4a70eb9
Branch: refs/heads/0.8
Commit: d4a70eb9b04bd4cec286cfb09eeeb3794ab4f4ee
Parents: 756be53
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Apr 8 09:48:03 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Apr 8 09:48:03 2013 -0700
----------------------------------------------------------------------
.../main/scala/kafka/admin/ListTopicCommand.scala | 38 +++++++++++----
core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +-
2 files changed, 29 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d4a70eb9/core/src/main/scala/kafka/admin/ListTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
index 095469b..c760cc0 100644
--- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
@@ -20,7 +20,6 @@ package kafka.admin
import joptsimple.OptionParser
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
-import kafka.common.ErrorMapping
object ListTopicCommand {
@@ -36,6 +35,10 @@ object ListTopicCommand {
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
+ val reportUnderReplicatedPartitionsOpt = parser.accepts("under-replicated-partitions",
+ "if set, only show under replicated partitions")
+ val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions",
+ "if set, only show partitions whose leader is not available")
val options = parser.parse(args : _*)
@@ -49,21 +52,24 @@ object ListTopicCommand {
val topic = options.valueOf(topicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
+ val reportUnderReplicatedPartitions = if (options.has(reportUnderReplicatedPartitionsOpt)) true else false
+ val reportUnavailablePartitions = if (options.has(reportUnavailablePartitionsOpt)) true else false
var zkClient: ZkClient = null
try {
var topicList: Seq[String] = Nil
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
if (topic == "")
- topicList = ZkUtils.getChildren(zkClient, ZkUtils.BrokerTopicsPath).sorted
+ topicList = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted
else
topicList = List(topic)
if (topicList.size <= 0)
println("no topics exist!")
+ val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
for (t <- topicList)
- showTopic(t, zkClient)
+ showTopic(t, zkClient, reportUnderReplicatedPartitions, reportUnavailablePartitions, liveBrokers)
}
catch {
case e =>
@@ -76,14 +82,26 @@ object ListTopicCommand {
}
}
- def showTopic(topic: String, zkClient: ZkClient) {
- val topicMetaData = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
- topicMetaData.errorCode match {
- case ErrorMapping.UnknownTopicOrPartitionCode =>
+ def showTopic(topic: String, zkClient: ZkClient, reportUnderReplicatedPartitions: Boolean,
+ reportUnavailablePartitions: Boolean, liveBrokers: Set[Int]) {
+ ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match {
+ case Some(topicPartitionAssignment) =>
+ val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
+ for ((partitionId, assignedReplicas) <- sortedPartitions) {
+ val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
+ val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
+ if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
+ (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
+ (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
+ print("topic: " + topic)
+ print("\tpartition: " + partitionId)
+ print("\tleader: " + (if(leader.isDefined) leader.get else "none"))
+ print("\treplicas: " + assignedReplicas.mkString(","))
+ println("\tisr: " + inSyncReplicas.mkString(","))
+ }
+ }
+ case None =>
println("topic " + topic + " doesn't exist!")
- case _ =>
- for (part <- topicMetaData.partitionsMetadata)
- println("topic: " + topic + "\t" + part.toString)
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/d4a70eb9/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ce1904b..5673ae2 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -72,7 +72,7 @@ object ZkUtils extends Logging {
}
def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
- val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+ val brokerIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath).sorted
brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
}