You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/04/08 18:48:20 UTC

git commit: kafka-850; add an option to show under replicated partitions in list topic command; patched by Jun Rao; reviewed by Neha Narkhede

Updated Branches:
  refs/heads/0.8 756be5363 -> d4a70eb9b


kafka-850; add an option to show under replicated partitions in list topic command; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d4a70eb9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d4a70eb9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d4a70eb9

Branch: refs/heads/0.8
Commit: d4a70eb9b04bd4cec286cfb09eeeb3794ab4f4ee
Parents: 756be53
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Apr 8 09:48:03 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Apr 8 09:48:03 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/admin/ListTopicCommand.scala  |   38 +++++++++++----
 core/src/main/scala/kafka/utils/ZkUtils.scala      |    2 +-
 2 files changed, 29 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d4a70eb9/core/src/main/scala/kafka/admin/ListTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
index 095469b..c760cc0 100644
--- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
@@ -20,7 +20,6 @@ package kafka.admin
 import joptsimple.OptionParser
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
-import kafka.common.ErrorMapping
 
 object ListTopicCommand {
 
@@ -36,6 +35,10 @@ object ListTopicCommand {
                            .withRequiredArg
                            .describedAs("urls")
                            .ofType(classOf[String])
+    val reportUnderReplicatedPartitionsOpt = parser.accepts("under-replicated-partitions",
+                                                            "if set, only show under replicated partitions")
+    val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions",
+                                                            "if set, only show partitions whose leader is not available")
 
     val options = parser.parse(args : _*)
 
@@ -49,21 +52,24 @@ object ListTopicCommand {
 
     val topic = options.valueOf(topicOpt)
     val zkConnect = options.valueOf(zkConnectOpt)
+    val reportUnderReplicatedPartitions = if (options.has(reportUnderReplicatedPartitionsOpt)) true else false
+    val reportUnavailablePartitions = if (options.has(reportUnavailablePartitionsOpt)) true else false
     var zkClient: ZkClient = null
     try {
       var topicList: Seq[String] = Nil
       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
 
       if (topic == "")
-        topicList = ZkUtils.getChildren(zkClient, ZkUtils.BrokerTopicsPath).sorted
+        topicList = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted
       else
         topicList = List(topic)
 
       if (topicList.size <= 0)
         println("no topics exist!")
 
+      val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
       for (t <- topicList)
-        showTopic(t, zkClient)
+        showTopic(t, zkClient, reportUnderReplicatedPartitions, reportUnavailablePartitions, liveBrokers)
     }
     catch {
       case e =>
@@ -76,14 +82,26 @@ object ListTopicCommand {
     }
   }
 
-  def showTopic(topic: String, zkClient: ZkClient) {
-    val topicMetaData = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-    topicMetaData.errorCode match {
-      case ErrorMapping.UnknownTopicOrPartitionCode =>
+  def showTopic(topic: String, zkClient: ZkClient, reportUnderReplicatedPartitions: Boolean,
+                reportUnavailablePartitions: Boolean, liveBrokers: Set[Int]) {
+    ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match {
+      case Some(topicPartitionAssignment) =>
+        val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
+        for ((partitionId, assignedReplicas) <- sortedPartitions) {
+          val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
+          val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
+          if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
+              (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
+              (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
+            print("topic: " + topic)
+            print("\tpartition: " + partitionId)
+            print("\tleader: " + (if(leader.isDefined) leader.get else "none"))
+            print("\treplicas: " + assignedReplicas.mkString(","))
+            println("\tisr: " + inSyncReplicas.mkString(","))
+          }
+        }
+      case None =>
         println("topic " + topic + " doesn't exist!")
-      case _ =>
-        for (part <- topicMetaData.partitionsMetadata)
-          println("topic: " + topic + "\t" + part.toString)
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d4a70eb9/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ce1904b..5673ae2 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -72,7 +72,7 @@ object ZkUtils extends Logging {
   }
 
   def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
-    val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+    val brokerIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath).sorted
     brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
   }