You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/02/07 06:00:24 UTC
git commit: kafka-1232; make TopicCommand more consistent;
patched by Jun Rao; reviewed by Guozhang Wang and Neha Narkhede
Updated Branches:
refs/heads/trunk 167acb832 -> 1032bf740
kafka-1232; make TopicCommand more consistent; patched by Jun Rao; reviewed by Guozhang Wang and Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1032bf74
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1032bf74
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1032bf74
Branch: refs/heads/trunk
Commit: 1032bf74054a198d5d663713be1152a9216ca150
Parents: 167acb8
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Feb 6 21:00:17 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Feb 6 21:00:17 2014 -0800
----------------------------------------------------------------------
.../main/scala/kafka/admin/TopicCommand.scala | 125 +++++++++++--------
.../scala/kafka/utils/CommandLineUtils.scala | 24 +++-
2 files changed, 90 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1032bf74/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 65510eb..fc8d686 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -40,10 +40,9 @@ object TopicCommand {
opts.parser.printHelpOn(System.err)
System.exit(1)
}
-
- CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
- if (!opts.options.has(opts.listOpt)) CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
-
+
+ opts.checkArgs()
+
val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
try {
@@ -67,10 +66,13 @@ object TopicCommand {
}
private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = {
- val topicsSpec = opts.options.valueOf(opts.topicOpt)
- val topicsFilter = new Whitelist(topicsSpec)
- val allTopics = ZkUtils.getAllTopics(zkClient)
- allTopics.filter(topicsFilter.isTopicAllowed).sorted
+ val allTopics = ZkUtils.getAllTopics(zkClient).sorted
+ if (opts.options.has(opts.topicOpt)) {
+ val topicsSpec = opts.options.valueOf(opts.topicOpt)
+ val topicsFilter = new Whitelist(topicsSpec)
+ allTopics.filter(topicsFilter.isTopicAllowed)
+ } else
+ allTopics
}
def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
@@ -107,10 +109,8 @@ object TopicCommand {
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
- println("adding partitions succeeded!")
+ println("Adding partitions succeeded!")
}
- if(opts.options.has(opts.replicationFactorOpt))
- Utils.croak("Changing the replication factor is not supported.")
}
}
@@ -123,53 +123,49 @@ object TopicCommand {
}
def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) {
- if(opts.options.has(opts.topicsWithOverridesOpt)) {
- ZkUtils.getAllTopics(zkClient).sorted.foreach { topic =>
- val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
- if(configs.size() != 0) {
- val replicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
- val numPartitions = replicaAssignment.size
- val replicationFactor = replicaAssignment.head._2.size
- println("\nTopic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s".format(topic, numPartitions,
- replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
- }
- }
- } else {
- for(topic <- ZkUtils.getAllTopics(zkClient).sorted)
+ val topics = getTopics(zkClient, opts)
+ for(topic <- topics)
println(topic)
- }
}
def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
val topics = getTopics(zkClient, opts)
val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false
val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false
+ val reportOverriddenConfigs = if (opts.options.has(opts.topicsWithOverridesOpt)) true else false
val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
for (topic <- topics) {
ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match {
case Some(topicPartitionAssignment) =>
+ val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions
+ val describePartitions: Boolean = !reportOverriddenConfigs
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
- if (!reportUnavailablePartitions && !reportUnderReplicatedPartitions) {
- println(topic)
- val config = AdminUtils.fetchTopicConfig(zkClient, topic)
- println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", "))
- println("\tpartitions: " + sortedPartitions.size)
+ if (describeConfigs) {
+ val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
+ if (!reportOverriddenConfigs || configs.size() != 0) {
+ val numPartitions = topicPartitionAssignment.size
+ val replicationFactor = topicPartitionAssignment.head._2.size
+ println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s"
+ .format(topic, numPartitions, replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
+ }
}
- for ((partitionId, assignedReplicas) <- sortedPartitions) {
- val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
- val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
- if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
- (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
- (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
- print("\t\ttopic: " + topic)
- print("\tpartition: " + partitionId)
- print("\tleader: " + (if(leader.isDefined) leader.get else "none"))
- print("\treplicas: " + assignedReplicas.mkString(","))
- println("\tisr: " + inSyncReplicas.mkString(","))
+ if (describePartitions) {
+ for ((partitionId, assignedReplicas) <- sortedPartitions) {
+ val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
+ val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
+ if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
+ (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
+ (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
+ print("\tTopic: " + topic)
+ print("\tPartition: " + partitionId)
+ print("\tLeader: " + (if(leader.isDefined) leader.get else "none"))
+ print("\tReplicas: " + assignedReplicas.mkString(","))
+ println("\tIsr: " + inSyncReplicas.mkString(","))
+ }
}
}
case None =>
- println("topic " + topic + " doesn't exist!")
+ println("Topic " + topic + " doesn't exist!")
}
}
}
@@ -187,15 +183,15 @@ object TopicCommand {
}
def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = {
- val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.split("""\s*=\s*"""))
- if(opts.options.has(opts.createOpt))
- require(configsToBeDeleted.size == 0, "Invalid topic config: all configs on create topic must be in the format \"key=val\".")
- require(configsToBeDeleted.forall(config => config.length == 1),
- "Invalid topic config: all configs to be deleted must be in the format \"key\".")
- val propsToBeDeleted = new Properties
- configsToBeDeleted.foreach(pair => propsToBeDeleted.setProperty(pair(0).trim, ""))
- LogConfig.validateNames(propsToBeDeleted)
- configsToBeDeleted.map(pair => pair(0))
+ if (opts.options.has(opts.deleteConfigOpt)) {
+ val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim())
+ val propsToBeDeleted = new Properties
+ configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
+ LogConfig.validateNames(propsToBeDeleted)
+ configsToBeDeleted
+ }
+ else
+ Seq.empty
}
def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
@@ -245,7 +241,7 @@ object TopicCommand {
.withRequiredArg
.describedAs("replication factor")
.ofType(classOf[java.lang.Integer])
- val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.")
+ val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created or altered.")
.withRequiredArg
.describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
"broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
@@ -255,9 +251,32 @@ object TopicCommand {
val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions",
"if set when describing topics, only show partitions whose leader is not available")
val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
- "if set when listing topics, only show topics that have overridden configs")
+ "if set when describing topics, only show topics that have overridden configs")
val options = parser.parse(args : _*)
+
+ val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, deleteOpt, describeOpt, listOpt)
+
+ def checkArgs() {
+ // check required args
+ CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+ if (!options.has(listOpt) && !options.has(describeOpt))
+ CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
+
+ // check invalid args
+ CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt,
+ allTopicLevelOpts -- Set(alterOpt, createOpt) + partitionsOpt + replicationFactorOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,
+ allTopicLevelOpts -- Set(describeOpt) + reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt,
+ allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
+ allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1032bf74/core/src/main/scala/kafka/utils/CommandLineUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index 5f563ca..726c302 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -17,20 +17,32 @@
package kafka.utils
import joptsimple.{OptionSpec, OptionSet, OptionParser}
+import scala.collection.Set
-/**
+ /**
* Helper functions for dealing with command line utilities
*/
object CommandLineUtils extends Logging {
- def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
- for(arg <- required) {
- if(!options.has(arg)) {
- System.err.println("Missing required argument \"" + arg + "\"")
+ def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
+ for(arg <- required) {
+ if(!options.has(arg)) {
+ System.err.println("Missing required argument \"" + arg + "\"")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+ }
+
+ def checkInvalidArgs(parser: OptionParser, options: OptionSet, usedOption: OptionSpec[_], invalidOptions: Set[OptionSpec[_]]) {
+ if(options.has(usedOption)) {
+ for(arg <- invalidOptions) {
+ if(options.has(arg)) {
+ System.err.println("Option \"" + usedOption + "\" can't be used with option\"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
}
}
-
+ }
}
\ No newline at end of file