You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/12/06 01:57:33 UTC

git commit: KAFKA-1167 Improve the kafka-topics tool to list the topics with overridden configs; reviewed by Jun Rao

Updated Branches:
  refs/heads/trunk 7dee06ee9 -> 876cfdb59


KAFKA-1167 Improve the kafka-topics tool to list the topics with overridden configs; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/876cfdb5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/876cfdb5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/876cfdb5

Branch: refs/heads/trunk
Commit: 876cfdb59730f0e0c0f87aed15d65fa747c19bd3
Parents: 7dee06e
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Dec 5 16:57:26 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Dec 5 16:57:26 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/admin/TopicCommand.scala   | 23 ++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/876cfdb5/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 3c08dee..d25aae3 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -51,7 +51,7 @@ object TopicCommand {
     else if(opts.options.has(opts.deleteOpt))
       deleteTopic(zkClient, opts)
     else if(opts.options.has(opts.listOpt))
-      listTopics(zkClient)
+      listTopics(zkClient, opts)
     else if(opts.options.has(opts.describeOpt))
       describeTopic(zkClient, opts)
 
@@ -109,9 +109,22 @@ object TopicCommand {
     }
   }
   
-  def listTopics(zkClient: ZkClient) {
-    for(topic <- ZkUtils.getAllTopics(zkClient).sorted)
-      println(topic)
+  def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) {
+    if(opts.options.has(opts.topicsWithOverridesOpt)) {
+      ZkUtils.getAllTopics(zkClient).sorted.foreach { topic =>
+        val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
+        if(configs.size() != 0) {
+          val replicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
+          val numPartitions = replicaAssignment.size
+          val replicationFactor = replicaAssignment.head._2.size
+          println("\nTopic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s".format(topic, numPartitions,
+                   replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
+        }
+      }
+    } else {
+      for(topic <- ZkUtils.getAllTopics(zkClient).sorted)
+        println(topic)
+    }
   }
   
   def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
@@ -229,6 +242,8 @@ object TopicCommand {
                                                             "if set when describing topics, only show under replicated partitions")
     val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions",
                                                             "if set when describing topics, only show partitions whose leader is not available")
+    val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
+                                                "if set when listing topics, only show topics that have overridden configs")
 
 
     val options = parser.parse(args : _*)