You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/02/07 06:00:24 UTC

git commit: kafka-1232; make TopicCommand more consistent; patched by Jun Rao; reviewed by Guozhang Wang and Neha Narkhede

Updated Branches:
  refs/heads/trunk 167acb832 -> 1032bf740


kafka-1232; make TopicCommand more consistent; patched by Jun Rao; reviewed by Guozhang Wang and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1032bf74
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1032bf74
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1032bf74

Branch: refs/heads/trunk
Commit: 1032bf74054a198d5d663713be1152a9216ca150
Parents: 167acb8
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Feb 6 21:00:17 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Feb 6 21:00:17 2014 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/admin/TopicCommand.scala   | 125 +++++++++++--------
 .../scala/kafka/utils/CommandLineUtils.scala    |  24 +++-
 2 files changed, 90 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1032bf74/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 65510eb..fc8d686 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -40,10 +40,9 @@ object TopicCommand {
       opts.parser.printHelpOn(System.err)
       System.exit(1)
     }
-      
-    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
-    if (!opts.options.has(opts.listOpt)) CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
-    
+
+    opts.checkArgs()
+
     val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
 
     try {
@@ -67,10 +66,13 @@ object TopicCommand {
   }
 
   private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = {
-    val topicsSpec = opts.options.valueOf(opts.topicOpt)
-    val topicsFilter = new Whitelist(topicsSpec)
-    val allTopics = ZkUtils.getAllTopics(zkClient)
-    allTopics.filter(topicsFilter.isTopicAllowed).sorted
+    val allTopics = ZkUtils.getAllTopics(zkClient).sorted
+    if (opts.options.has(opts.topicOpt)) {
+      val topicsSpec = opts.options.valueOf(opts.topicOpt)
+      val topicsFilter = new Whitelist(topicsSpec)
+      allTopics.filter(topicsFilter.isTopicAllowed)
+    } else
+      allTopics
   }
 
   def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
@@ -107,10 +109,8 @@ object TopicCommand {
         val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
         val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
         AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
-        println("adding partitions succeeded!")
+        println("Adding partitions succeeded!")
       }
-      if(opts.options.has(opts.replicationFactorOpt))
-        Utils.croak("Changing the replication factor is not supported.")
     }
   }
   
@@ -123,53 +123,49 @@ object TopicCommand {
   }
   
   def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) {
-    if(opts.options.has(opts.topicsWithOverridesOpt)) {
-      ZkUtils.getAllTopics(zkClient).sorted.foreach { topic =>
-        val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
-        if(configs.size() != 0) {
-          val replicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
-          val numPartitions = replicaAssignment.size
-          val replicationFactor = replicaAssignment.head._2.size
-          println("\nTopic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s".format(topic, numPartitions,
-                   replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
-        }
-      }
-    } else {
-      for(topic <- ZkUtils.getAllTopics(zkClient).sorted)
+    val topics = getTopics(zkClient, opts)
+    for(topic <- topics)
         println(topic)
-    }
   }
   
   def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
     val topics = getTopics(zkClient, opts)
     val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false
     val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false
+    val reportOverriddenConfigs = if (opts.options.has(opts.topicsWithOverridesOpt)) true else false
     val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
     for (topic <- topics) {
       ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match {
         case Some(topicPartitionAssignment) =>
+          val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions
+          val describePartitions: Boolean = !reportOverriddenConfigs
           val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
-          if (!reportUnavailablePartitions && !reportUnderReplicatedPartitions) {
-            println(topic)
-            val config = AdminUtils.fetchTopicConfig(zkClient, topic)
-            println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", "))
-            println("\tpartitions: " + sortedPartitions.size)
+          if (describeConfigs) {
+            val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
+            if (!reportOverriddenConfigs || configs.size() != 0) {
+              val numPartitions = topicPartitionAssignment.size
+              val replicationFactor = topicPartitionAssignment.head._2.size
+              println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s"
+                .format(topic, numPartitions, replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
+            }
           }
-          for ((partitionId, assignedReplicas) <- sortedPartitions) {
-            val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
-            val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
-            if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
-                (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
-                (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
-              print("\t\ttopic: " + topic)
-              print("\tpartition: " + partitionId)
-              print("\tleader: " + (if(leader.isDefined) leader.get else "none"))
-              print("\treplicas: " + assignedReplicas.mkString(","))
-              println("\tisr: " + inSyncReplicas.mkString(","))
+          if (describePartitions) {
+            for ((partitionId, assignedReplicas) <- sortedPartitions) {
+              val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
+              val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
+              if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
+                  (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
+                  (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
+                print("\tTopic: " + topic)
+                print("\tPartition: " + partitionId)
+                print("\tLeader: " + (if(leader.isDefined) leader.get else "none"))
+                print("\tReplicas: " + assignedReplicas.mkString(","))
+                println("\tIsr: " + inSyncReplicas.mkString(","))
+              }
             }
           }
         case None =>
-          println("topic " + topic + " doesn't exist!")
+          println("Topic " + topic + " doesn't exist!")
       }
     }
   }
@@ -187,15 +183,15 @@ object TopicCommand {
   }
 
   def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = {
-    val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.split("""\s*=\s*"""))
-    if(opts.options.has(opts.createOpt))
-      require(configsToBeDeleted.size == 0, "Invalid topic config: all configs on create topic must be in the format \"key=val\".")
-    require(configsToBeDeleted.forall(config => config.length == 1),
-      "Invalid topic config: all configs to be deleted must be in the format \"key\".")
-    val propsToBeDeleted = new Properties
-    configsToBeDeleted.foreach(pair => propsToBeDeleted.setProperty(pair(0).trim, ""))
-    LogConfig.validateNames(propsToBeDeleted)
-    configsToBeDeleted.map(pair => pair(0))
+    if (opts.options.has(opts.deleteConfigOpt)) {
+      val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim())
+      val propsToBeDeleted = new Properties
+      configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
+      LogConfig.validateNames(propsToBeDeleted)
+      configsToBeDeleted
+    }
+    else
+      Seq.empty
   }
 
   def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
@@ -245,7 +241,7 @@ object TopicCommand {
                            .withRequiredArg
                            .describedAs("replication factor")
                            .ofType(classOf[java.lang.Integer])
-    val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.")
+    val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created or altered.")
                            .withRequiredArg
                            .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
                                         "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
@@ -255,9 +251,32 @@ object TopicCommand {
     val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions",
                                                             "if set when describing topics, only show partitions whose leader is not available")
     val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
-                                                "if set when listing topics, only show topics that have overridden configs")
+                                                "if set when describing topics, only show topics that have overridden configs")
 
     val options = parser.parse(args : _*)
+
+    val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, deleteOpt, describeOpt, listOpt)
+
+    def checkArgs() {
+      // check required args
+      CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+      if (!options.has(listOpt) && !options.has(describeOpt))
+        CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
+
+      // check invalid args
+      CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt,
+        allTopicLevelOpts -- Set(alterOpt, createOpt) + partitionsOpt + replicationFactorOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,
+        allTopicLevelOpts -- Set(describeOpt) + reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt,
+        allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
+        allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt)
+    }
   }
   
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1032bf74/core/src/main/scala/kafka/utils/CommandLineUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index 5f563ca..726c302 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -17,20 +17,32 @@
  package kafka.utils
 
 import joptsimple.{OptionSpec, OptionSet, OptionParser}
+import scala.collection.Set
 
-/**
+ /**
  * Helper functions for dealing with command line utilities
  */
 object CommandLineUtils extends Logging {
 
-    def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
-      for(arg <- required) {
-        if(!options.has(arg)) {
-          System.err.println("Missing required argument \"" + arg + "\"")
+  def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
+    for(arg <- required) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+  }
+  
+  def checkInvalidArgs(parser: OptionParser, options: OptionSet, usedOption: OptionSpec[_], invalidOptions: Set[OptionSpec[_]]) {
+    if(options.has(usedOption)) {
+      for(arg <- invalidOptions) {
+        if(options.has(arg)) {
+          System.err.println("Option \"" + usedOption + "\" can't be used with option\"" + arg + "\"")
           parser.printHelpOn(System.err)
           System.exit(1)
         }
       }
     }
-  
+  }
 }
\ No newline at end of file