You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/17 21:00:20 UTC
kafka git commit: KAFKA-2746;
Add support for using ConsumerGroupCommand on secure install
Repository: kafka
Updated Branches:
refs/heads/trunk 52d5e8839 -> ffc0965d3
KAFKA-2746; Add support for using ConsumerGroupCommand on secure install
Author: Ashish Singh <as...@cloudera.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>, Jun Rao <ju...@gmail.com>
Closes #534 from SinghAsDev/KAFKA-2746
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ffc0965d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ffc0965d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ffc0965d
Branch: refs/heads/trunk
Commit: ffc0965d38c364272078d771fc5ed5f8784e4012
Parents: 52d5e88
Author: Ashish Singh <as...@cloudera.com>
Authored: Tue Nov 17 12:00:16 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Nov 17 12:00:16 2015 -0800
----------------------------------------------------------------------
.../main/scala/kafka/admin/AdminClient.scala | 3 ++
.../kafka/admin/ConsumerGroupCommand.scala | 36 +++++++++-----------
2 files changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ffc0965d/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 181080f..53b6fdb 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -13,6 +13,7 @@
package kafka.admin
import java.nio.ByteBuffer
+import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import kafka.common.KafkaException
@@ -209,6 +210,8 @@ object AdminClient {
create(new AdminConfig(config))
}
+ def create(props: Properties): AdminClient = create(props.asScala.toMap)
+
def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
def create(config: AdminConfig): AdminClient = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ffc0965d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 2d95767..d71499e 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -26,6 +26,7 @@ import kafka.common.{TopicAndPartition, _}
import kafka.consumer.SimpleConsumer
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNoNodeException
+import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.security.JaasUtils
@@ -75,15 +76,6 @@ object ConsumerGroupCommand {
}
}
- private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = {
- val configsToBeAdded = opts.options.valuesOf(opts.configOpt).asScala.map(_.split("""\s*=\s*"""))
- require(configsToBeAdded.forall(config => config.length == 2),
- "Invalid config: all configs to be added must be in the format \"key=val\".")
- val props = new Properties
- configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
- props
- }
-
sealed trait ConsumerGroupService {
def list(): Unit
@@ -160,9 +152,9 @@ object ConsumerGroupCommand {
}
protected def describeGroup(group: String) {
- val configs = parseConfigs(opts)
- val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
- val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
+ val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties()
+ val channelSocketTimeoutMs = props.getProperty("channelSocketTimeoutMs", "600").toInt
+ val channelRetryBackoffMs = props.getProperty("channelRetryBackoffMsOpt", "300").toInt
val topics = zkUtils.getTopicsByConsumerGroup(group)
if (topics.isEmpty)
println("No topic available for consumer group provided")
@@ -352,8 +344,11 @@ object ConsumerGroupCommand {
if (consumer != null) consumer.close()
}
- private def createAdminClient(): AdminClient =
- AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt))
+ private def createAdminClient(): AdminClient = {
+ val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties()
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+ AdminClient.create(props)
+ }
private def getConsumer() = {
if (consumer == null)
@@ -371,6 +366,8 @@ object ConsumerGroupCommand {
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
+ if (opts.options.has(opts.commandConfigOpt)) properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)))
+
new KafkaConsumer(properties)
}
@@ -390,7 +387,6 @@ object ConsumerGroupCommand {
val BootstrapServerDoc = "REQUIRED (only when using new-consumer): The server to connect to."
val GroupDoc = "The consumer group we wish to act on."
val TopicDoc = "The topic whose consumer group information should be deleted."
- val ConfigDoc = "Configuration for timeouts. For instance --config channelSocketTimeoutMs=600"
val ListDoc = "List all consumer groups."
val DescribeDoc = "Describe consumer group and list offset lag related to given group."
val nl = System.getProperty("line.separator")
@@ -402,6 +398,7 @@ object ConsumerGroupCommand {
"for every consumer group. For instance --topic t1" + nl +
"WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active."
val NewConsumerDoc = "Use new consumer."
+ val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer."
val parser = new OptionParser
val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
.withRequiredArg
@@ -419,14 +416,14 @@ object ConsumerGroupCommand {
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
- val configOpt = parser.accepts("config", ConfigDoc)
- .withRequiredArg
- .describedAs("name=value")
- .ofType(classOf[String])
val listOpt = parser.accepts("list", ListDoc)
val describeOpt = parser.accepts("describe", DescribeDoc)
val deleteOpt = parser.accepts("delete", DeleteDoc)
val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc)
+ val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
+ .withRequiredArg
+ .describedAs("command config property file")
+ .ofType(classOf[String])
val options = parser.parse(args : _*)
val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)
@@ -460,7 +457,6 @@ object ConsumerGroupCommand {
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt)
CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allConsumerGroupLevelOpts - describeOpt)
}
}
}