You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/21 09:21:00 UTC

[jira] [Commented] (KAFKA-7054) Kafka describe command should throw topic doesn't exist exception.

    [ https://issues.apache.org/jira/browse/KAFKA-7054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16726579#comment-16726579 ] 

ASF GitHub Bot commented on KAFKA-7054:
---------------------------------------

omkreddy closed pull request #5211: [KAFKA-7054] Kafka describe command should throw topic doesn't exist exception.
URL: https://github.com/apache/kafka/pull/5211
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 92cde7e4dc2..84239b0aaa9 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -121,10 +121,7 @@ object TopicCommand extends Logging {
   def alterTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
     val topics = getTopics(zkClient, opts)
     val ifExists = opts.options.has(opts.ifExistsOpt)
-    if (topics.isEmpty && !ifExists) {
-      throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
-          opts.options.valueOf(opts.zkConnectOpt)))
-    }
+    ensureTopicExists(opts, topics, ifExists)
     val adminZkClient = new AdminZkClient(zkClient)
     topics.foreach { topic =>
       val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
@@ -180,10 +177,7 @@ object TopicCommand extends Logging {
   def deleteTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
     val topics = getTopics(zkClient, opts)
     val ifExists = opts.options.has(opts.ifExistsOpt)
-    if (topics.isEmpty && !ifExists) {
-      throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
-          opts.options.valueOf(opts.zkConnectOpt)))
-    }
+    ensureTopicExists(opts, topics, ifExists)
     topics.foreach { topic =>
       try {
         if (Topic.isInternal(topic)) {
@@ -206,6 +200,8 @@ object TopicCommand extends Logging {
 
   def describeTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
     val topics = getTopics(zkClient, opts)
+    val topicOptWithExits = opts.options.has(opts.topicOpt) && opts.options.has(opts.ifExistsOpt)
+    ensureTopicExists(opts, topics, topicOptWithExits)
     val reportUnderReplicatedPartitions = opts.options.has(opts.reportUnderReplicatedPartitionsOpt)
     val reportUnavailablePartitions = opts.options.has(opts.reportUnavailablePartitionsOpt)
     val reportOverriddenConfigs = opts.options.has(opts.topicsWithOverridesOpt)
@@ -258,6 +254,21 @@ object TopicCommand extends Logging {
     }
   }
 
+  /**
+    * ensures topic existence and throws exception if topic doesn't exist
+    *
+    * @param opts
+    * @param topics
+    * @param topicOptWithExists
+    */
+  private def ensureTopicExists(opts: TopicCommandOptions, topics: Seq[String], topicOptWithExists: Boolean) = {
+    if (topics.isEmpty && !topicOptWithExists) {
+      // If given topic doesn't exist then throw exception
+      throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
+        opts.options.valueOf(opts.zkConnectOpt)))
+    }
+  }
+
   def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
     val configsToBeAdded = opts.options.valuesOf(opts.configOpt).asScala.map(_.split("""\s*=\s*"""))
     require(configsToBeAdded.forall(config => config.length == 2),
@@ -349,7 +360,7 @@ object TopicCommand extends Logging {
     val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
                                                 "if set when describing topics, only show topics that have overridden configs")
     val ifExistsOpt = parser.accepts("if-exists",
-                                     "if set when altering or deleting topics, the action will only execute if the topic exists")
+                                     "if set when altering or deleting or describing topics, the action will only execute if the topic exists")
     val ifNotExistsOpt = parser.accepts("if-not-exists",
                                         "if set when creating topics, the action will only execute if the topic does not already exist")
 
@@ -366,6 +377,8 @@ object TopicCommand extends Logging {
     def checkArgs() {
       // check required args
       CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+      if(options.has(describeOpt) && options.has(ifExistsOpt))
+        CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
       if (!options.has(listOpt) && !options.has(describeOpt))
         CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
 
@@ -383,7 +396,7 @@ object TopicCommand extends Logging {
         allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
         allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt)
-      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, allTopicLevelOpts -- Set(listOpt, describeOpt))
     }
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index a469f8efe04..aebe6f76b94 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -160,6 +160,24 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     TopicCommand.createTopic(zkClient, createNotExistsOpts)
   }
 
+  @Test
+  def testDescribeIfTopicNotExists() {
+    // create brokers
+    val brokers = List(0, 1, 2)
+    TestUtils.createBrokersInZk(zkClient, brokers)
+
+    // describe topic that does not exist
+    val describeOpts = new TopicCommandOptions(Array("--topic", "test"))
+    intercept[IllegalArgumentException] {
+      TopicCommand.describeTopic(zkClient, describeOpts)
+    }
+
+    // describe topic that does not exist with --if-exists
+    val describeOptsWithExists = new TopicCommandOptions(Array("--topic", "test", "--if-exists"))
+    // should not throw any error
+    TopicCommand.describeTopic(zkClient, describeOptsWithExists)
+  }
+
   @Test
   def testCreateAlterTopicWithRackAware() {
     val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Kafka describe command should throw topic doesn't exist exception.
> ------------------------------------------------------------------
>
>                 Key: KAFKA-7054
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7054
>             Project: Kafka
>          Issue Type: Improvement
>          Components: admin
>            Reporter: Manohar Vanam
>            Priority: Minor
>             Fix For: 2.2.0
>
>
> If topic doesn't exist thenĀ Kafka describe command should throw topic doesn't exist exception.
> like alter and delete commands :
> {code:java}
> local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic manu
> Error while executing topic command : Topic manu does not exist on ZK path localhost:2181
> [2018-06-13 15:08:13,111] ERROR java.lang.IllegalArgumentException: Topic manu does not exist on ZK path localhost:2181
>  at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91)
>  at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:184)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:71)
>  at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$)
> local:bin mvanam$ ./kafka-topics.sh --zookeeper localhost:2181 --alter --topic manu
> Error while executing topic command : Topic manu does not exist on ZK path localhost:2181
> [2018-06-13 15:08:43,663] ERROR java.lang.IllegalArgumentException: Topic manu does not exist on ZK path localhost:2181
>  at kafka.admin.TopicCommand$.getTopics(TopicCommand.scala:91)
>  at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:125)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)
>  at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)