You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/17 21:00:20 UTC

kafka git commit: KAFKA-2746; Add support for using ConsumerGroupCommand on secure install

Repository: kafka
Updated Branches:
  refs/heads/trunk 52d5e8839 -> ffc0965d3


KAFKA-2746; Add support for using ConsumerGroupCommand on secure install

Author: Ashish Singh <as...@cloudera.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>, Jun Rao <ju...@gmail.com>

Closes #534 from SinghAsDev/KAFKA-2746


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ffc0965d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ffc0965d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ffc0965d

Branch: refs/heads/trunk
Commit: ffc0965d38c364272078d771fc5ed5f8784e4012
Parents: 52d5e88
Author: Ashish Singh <as...@cloudera.com>
Authored: Tue Nov 17 12:00:16 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Nov 17 12:00:16 2015 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/admin/AdminClient.scala    |  3 ++
 .../kafka/admin/ConsumerGroupCommand.scala      | 36 +++++++++-----------
 2 files changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ffc0965d/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 181080f..53b6fdb 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -13,6 +13,7 @@
 package kafka.admin
 
 import java.nio.ByteBuffer
+import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 
 import kafka.common.KafkaException
@@ -209,6 +210,8 @@ object AdminClient {
     create(new AdminConfig(config))
   }
 
+  def create(props: Properties): AdminClient = create(props.asScala.toMap)
+
   def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
 
   def create(config: AdminConfig): AdminClient = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffc0965d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 2d95767..d71499e 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -26,6 +26,7 @@ import kafka.common.{TopicAndPartition, _}
 import kafka.consumer.SimpleConsumer
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNoNodeException
+import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.security.JaasUtils
@@ -75,15 +76,6 @@ object ConsumerGroupCommand {
     }
   }
 
-  private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = {
-    val configsToBeAdded = opts.options.valuesOf(opts.configOpt).asScala.map(_.split("""\s*=\s*"""))
-    require(configsToBeAdded.forall(config => config.length == 2),
-      "Invalid config: all configs to be added must be in the format \"key=val\".")
-    val props = new Properties
-    configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
-    props
-  }
-
   sealed trait ConsumerGroupService {
 
     def list(): Unit
@@ -160,9 +152,9 @@ object ConsumerGroupCommand {
     }
 
     protected def describeGroup(group: String) {
-      val configs = parseConfigs(opts)
-      val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
-      val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
+      val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties()
+      val channelSocketTimeoutMs = props.getProperty("channelSocketTimeoutMs", "600").toInt
+      val channelRetryBackoffMs = props.getProperty("channelRetryBackoffMsOpt", "300").toInt
       val topics = zkUtils.getTopicsByConsumerGroup(group)
       if (topics.isEmpty)
         println("No topic available for consumer group provided")
@@ -352,8 +344,11 @@ object ConsumerGroupCommand {
       if (consumer != null) consumer.close()
     }
 
-    private def createAdminClient(): AdminClient =
-      AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt))
+    private def createAdminClient(): AdminClient = {
+      val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties()
+      props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+      AdminClient.create(props)
+    }
 
     private def getConsumer() = {
       if (consumer == null)
@@ -371,6 +366,8 @@ object ConsumerGroupCommand {
       properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
       properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
       properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
+      if (opts.options.has(opts.commandConfigOpt)) properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)))
+
       new KafkaConsumer(properties)
     }
 
@@ -390,7 +387,6 @@ object ConsumerGroupCommand {
     val BootstrapServerDoc = "REQUIRED (only when using new-consumer): The server to connect to."
     val GroupDoc = "The consumer group we wish to act on."
     val TopicDoc = "The topic whose consumer group information should be deleted."
-    val ConfigDoc = "Configuration for timeouts. For instance --config channelSocketTimeoutMs=600"
     val ListDoc = "List all consumer groups."
     val DescribeDoc = "Describe consumer group and list offset lag related to given group."
     val nl = System.getProperty("line.separator")
@@ -402,6 +398,7 @@ object ConsumerGroupCommand {
       "for every consumer group. For instance --topic t1" + nl +
       "WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active."
     val NewConsumerDoc = "Use new consumer."
+    val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer."
     val parser = new OptionParser
     val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
                              .withRequiredArg
@@ -419,14 +416,14 @@ object ConsumerGroupCommand {
                          .withRequiredArg
                          .describedAs("topic")
                          .ofType(classOf[String])
-    val configOpt = parser.accepts("config", ConfigDoc)
-                          .withRequiredArg
-                          .describedAs("name=value")
-                          .ofType(classOf[String])
     val listOpt = parser.accepts("list", ListDoc)
     val describeOpt = parser.accepts("describe", DescribeDoc)
     val deleteOpt = parser.accepts("delete", DeleteDoc)
     val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc)
+    val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
+                                  .withRequiredArg
+                                  .describedAs("command config property file")
+                                  .ofType(classOf[String])
     val options = parser.parse(args : _*)
 
     val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)
@@ -460,7 +457,6 @@ object ConsumerGroupCommand {
       // check invalid args
       CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt)
-      CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allConsumerGroupLevelOpts - describeOpt)
     }
   }
 }