You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/28 00:30:51 UTC

[GitHub] [kafka] vinothchandar opened a new pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

vinothchandar opened a new pull request #8737:
URL: https://github.com/apache/kafka/pull/8737


    - Fixed command line arg checking
    - Added unit test cases
    - Local testing with a multi node Kafka cluster
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432157321



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -247,8 +247,12 @@ object TopicCommand extends Logging {
         val createResult = adminClient.createTopics(Collections.singleton(newTopic))
         createResult.all().get()
         println(s"Created topic ${topic.name}.")
-      } else {
-        throw new IllegalArgumentException(s"Topic ${topic.name} already exists")
+      } catch {
+        case e : ExecutionException =>
+          if (e.getCause == null)
+            throw e
+          if (!e.getCause.isInstanceOf[TopicExistsException] || !topic.ifTopicDoesntExist())

Review comment:
       I interpreted [~stanislavkozlovski]'s suggestion as being something like this:
   ```
             if (!(e.getCause.isInstanceOf[TopicExistsException] && topic.ifTopicDoesntExist()))
               throw e.getCause
   ```
   So you wouldn't need to return out of the method in this case.
   
   I do think that using AND here as suggested would be more intuitive.  The reason is because it's basically a special case we're handling here: we got TopicExistsException AND we had `--if-not-exists`.  Using the OR construct just feels unintuitive since we're basically taking the disjunction of the special case (enumerating all the ways we could not be in the special case.)
   
   Anyway, I don't feel that strongly about it.  If you really want to keep it as is, that's OK.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432566976



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -736,8 +743,8 @@ object TopicCommand extends Logging {
         allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
         allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
-      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt) ++ Set(bootstrapServerOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt) ++ Set(bootstrapServerOpt))

Review comment:
       I think you meant a test.. if so, the tests I added already use --bootstrap-server with these options.. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r434223936



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -259,24 +263,29 @@ object TopicCommand extends Logging {
     override def alterTopic(opts: TopicCommandOptions): Unit = {
       val topic = new CommandTopicPartition(opts)
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
-      val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values()
-      adminClient.createPartitions(topics.map {topicName =>
-        if (topic.hasReplicaAssignment) {
-          val startPartitionId = topicsInfo.get(topicName).get().partitions().size()
-          val newAssignment = {
-            val replicaMap = topic.replicaAssignment.get.drop(startPartitionId)
-            new util.ArrayList(replicaMap.map(p => p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]]
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+
+      if (topics.nonEmpty) {

Review comment:
       We don't need to check if topics is nonempty here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #8737:
URL: https://github.com/apache/kafka/pull/8737


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#issuecomment-637855983


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432814222



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -259,7 +263,8 @@ object TopicCommand extends Logging {
     override def alterTopic(opts: TopicCommandOptions): Unit = {
       val topic = new CommandTopicPartition(opts)
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       Sounds good.  Thanks for digging into this!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432166896



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -259,7 +263,8 @@ object TopicCommand extends Logging {
     override def alterTopic(opts: TopicCommandOptions): Unit = {
       val topic = new CommandTopicPartition(opts)
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       if the topic does not exist `ensureTopicExists()` will throw an `IllegalArgumentException` right? Let me look at this and the other two closely and get back 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432868431



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -290,42 +299,50 @@ object TopicCommand extends Logging {
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
-      val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
-      val reassignments = listAllReassignments()
-      val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
-      val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
-
-      for (td <- topicDescriptions) {
-        val topicName = td.name
-        val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
-        val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
-
-        if (describeOptions.describeConfigs) {
-          val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
-          if (!opts.reportOverriddenConfigs || hasNonDefault) {
-            val numPartitions = td.partitions().size
-            val firstPartition = td.partitions.iterator.next()
-            val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
-            val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false)
-            topicDesc.printDescription()
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+
+      if (topics.nonEmpty) {

Review comment:
       `        val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
   `
   
   This line still makes an RPC call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432625688



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -290,6 +295,8 @@ object TopicCommand extends Logging {
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       On avoiding `getTopics`, same issue.. `AdminClient#describeTopics(..)` cannot handle regexes. I made the changes for using the trimmed down `Admin#listPartitionReassignments` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432814786



##########
File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##########
@@ -223,8 +224,20 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     createAndWaitTopic(createOpts)
 
     // try to re-create the topic
-    intercept[IllegalArgumentException] {
+    intercept[TopicExistsException] {
+      topicService.createTopic(createOpts)
+    }
+  }
+
+  @Test
+  def testCreateWhenAlreadyExistsWithIfNotExists(): Unit = {
+    val createOpts = new TopicCommandOptions(Array("--topic", testTopicName, "--if-not-exists"))
+    createAndWaitTopic(createOpts)
+
+    try {
       topicService.createTopic(createOpts)
+    } catch {

Review comment:
       I guess personally I find it confusing since we're catching a particular type of exception specifically, but then just doing what we would have done anyway if we hadn't caught it (failing).  Also the failure message "Alter topic should not throw exception" implies that it is catching all exceptions, but actually only catching some that the code could throw.  So doubly confusing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#issuecomment-637856338


   Thanks @vinothchandar !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432814325



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -290,42 +299,50 @@ object TopicCommand extends Logging {
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
-      val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
-      val reassignments = listAllReassignments()
-      val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
-      val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
-
-      for (td <- topicDescriptions) {
-        val topicName = td.name
-        val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
-        val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
-
-        if (describeOptions.describeConfigs) {
-          val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
-          if (!opts.reportOverriddenConfigs || hasNonDefault) {
-            val numPartitions = td.partitions().size
-            val firstPartition = td.partitions.iterator.next()
-            val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
-            val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false)
-            topicDesc.printDescription()
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+
+      if (topics.nonEmpty) {

Review comment:
       It's not necessary to check this.  AdminClient handles being passed empty lists or sets of topics.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432171069



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -259,7 +263,8 @@ object TopicCommand extends Logging {
     override def alterTopic(opts: TopicCommandOptions): Unit = {
       val topic = new CommandTopicPartition(opts)
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       Okay.. so I did think about the case where `--if-exists` is provided and thus `ensureTopicExists()` will move on without erroring... But I chose to let whatever exception that is thrown like NoSuchTopicOrPartitionException to propagate back to the caller/main() method..
   
   I guess you are suggesting to go to the server first and then handle the exception..  I am fine changing it, it does seem cleaner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r431950804



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -736,8 +743,8 @@ object TopicCommand extends Logging {
         allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
         allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
-      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt) ++ Set(bootstrapServerOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt) ++ Set(bootstrapServerOpt))

Review comment:
       Would it be easy to add a for this? (ensure the bootstrap server doesn't throw)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432185717



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -259,7 +263,8 @@ object TopicCommand extends Logging {
     override def alterTopic(opts: TopicCommandOptions): Unit = {
       val topic = new CommandTopicPartition(opts)
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       I was wondering about how the tests passed!  I guess I missed something important about `getTopics`-- the fact that it returns either one or zero topics.  (I forgot that zero was a possibility).
   
   The problem with how the code works today is that it's really very wasteful.  It's literally listing every topic in the cluster, (which could be tens of thousands) just to see if the specific one you passed exists. There is also the potential for a TOCTOU issue here (for example, it existed but then was deleted right before you went to use it.)
   
   So I would like to see this fixed to avoid doing the getTopics.  I guess think about it and if it seems reasonable to do it in the PR, let's go for it.  Otherwise I can create a follow-up JIRA.
   
   We will need to fix stuff like this to scale the Kafka up.  People listing the full set of all topics is bad for scalability.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432157321



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -247,8 +247,12 @@ object TopicCommand extends Logging {
         val createResult = adminClient.createTopics(Collections.singleton(newTopic))
         createResult.all().get()
         println(s"Created topic ${topic.name}.")
-      } else {
-        throw new IllegalArgumentException(s"Topic ${topic.name} already exists")
+      } catch {
+        case e : ExecutionException =>
+          if (e.getCause == null)
+            throw e
+          if (!e.getCause.isInstanceOf[TopicExistsException] || !topic.ifTopicDoesntExist())

Review comment:
       I interpreted @stanislavkozlovski 's suggestion as being something like this:
   ```
             if (!(e.getCause.isInstanceOf[TopicExistsException] && topic.ifTopicDoesntExist()))
               throw e.getCause
   ```
   So you wouldn't need to return out of the method in this case.
   
   I do think that using AND here as suggested would be more intuitive.  The reason is because it's basically a special case we're handling here: we got TopicExistsException AND we had `--if-not-exists`.  Using the OR construct just feels unintuitive since we're basically taking the disjunction of the special case (enumerating all the ways we could not be in the special case.)
   
   Anyway, I don't feel that strongly about it.  If you really want to keep it as is, that's OK.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432868132



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -290,42 +299,50 @@ object TopicCommand extends Logging {
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
-      val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
-      val reassignments = listAllReassignments()
-      val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
-      val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
-
-      for (td <- topicDescriptions) {
-        val topicName = td.name
-        val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
-        val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
-
-        if (describeOptions.describeConfigs) {
-          val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
-          if (!opts.reportOverriddenConfigs || hasNonDefault) {
-            val numPartitions = td.partitions().size
-            val firstPartition = td.partitions.iterator.next()
-            val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
-            val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false)
-            topicDesc.printDescription()
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+
+      if (topics.nonEmpty) {
+        val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
+        val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
+        val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
+        val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
+        val topicPartitions = topicDescriptions
+          .flatMap(td => td.partitions.iterator().asScala.map(p => new TopicPartition(td.name(), p.partition())))
+          .toSet.asJava
+        val reassignments = listAllReassignments(topicPartitions)
+
+        for (td <- topicDescriptions) {
+          val topicName = td.name
+          val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
+          val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
+
+          if (describeOptions.describeConfigs) {
+            val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
+            if (!opts.reportOverriddenConfigs || hasNonDefault) {
+              val numPartitions = td.partitions().size
+              val firstPartition = td.partitions.iterator.next()
+              val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
+              val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false)
+              topicDesc.printDescription()
+            }
           }
-        }
 
-        if (describeOptions.describePartitions) {
-          for (partition <- sortedPartitions) {
-            val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition))
-            val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment)
-            describeOptions.maybePrintPartitionDescription(partitionDesc)
+          if (describeOptions.describePartitions) {
+            for (partition <- sortedPartitions) {
+              val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition))
+              val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment)
+              describeOptions.maybePrintPartitionDescription(partitionDesc)
+            }
           }
         }
       }
     }
 
     override def deleteTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
-      adminClient.deleteTopics(topics.asJavaCollection).all().get()
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+      if (topics.nonEmpty)

Review comment:
       For `alter` :  we call KafkaAdminClient#createPartitions() which will make the RPC call even if `topics` at ~L2406 is empty..
   
   For `describe`:  KafkaAdminClient#describeCluster() call is still wasteful. no?
   
   For `delete`: it's actually handled, RPC avoided.. I will remove the check




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432158637



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -259,7 +263,8 @@ object TopicCommand extends Logging {
     override def alterTopic(opts: TopicCommandOptions): Unit = {
       val topic = new CommandTopicPartition(opts)
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       I don't think this is really enough to accomplish your goal here.  With this code, if the topic does not exist, we will get past this line but then fail on the next line in `describeTopics, with a `NoSuchTopicOrPartitionException`.
   
   I think it would be better to simply catch the `NoSuchTopicOrPartitionException` and handle it appropriately like we did with topic creation.  This also avoids the redunant calls in `ensureTopicExists`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432159364



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -290,6 +295,8 @@ object TopicCommand extends Logging {
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       Same issue here as above.  You can get past this line this way, but you still fail when actually calling `describeTopics`.
   
   Another issue with this code is that we should not be listing all partition reassignments.  `Admin#listPartitionReassignments` has a variant that takes a list of partitions.  We should use that so that we're not fetching a lot of information that we don't need.  I think this should be an easy fix




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r431978052



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -247,8 +247,12 @@ object TopicCommand extends Logging {
         val createResult = adminClient.createTopics(Collections.singleton(newTopic))
         createResult.all().get()
         println(s"Created topic ${topic.name}.")
-      } else {
-        throw new IllegalArgumentException(s"Topic ${topic.name} already exists")
+      } catch {
+        case e : ExecutionException =>
+          if (e.getCause == null)
+            throw e
+          if (!e.getCause.isInstanceOf[TopicExistsException] || !topic.ifTopicDoesntExist())

Review comment:
       Part of the issue here is I wanted to throw e, if the cause is null.. So I needed to check for that anyway. 
   
   I was debating the `||` vs `&&` there. With the `if` you mentioned, I need to return out of the method, instead of throwing.. I prefer to keep this the way it is. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432162100



##########
File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##########
@@ -223,8 +224,20 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     createAndWaitTopic(createOpts)
 
     // try to re-create the topic
-    intercept[IllegalArgumentException] {
+    intercept[TopicExistsException] {
+      topicService.createTopic(createOpts)
+    }
+  }
+
+  @Test
+  def testCreateWhenAlreadyExistsWithIfNotExists(): Unit = {
+    val createOpts = new TopicCommandOptions(Array("--topic", testTopicName, "--if-not-exists"))
+    createAndWaitTopic(createOpts)
+
+    try {
       topicService.createTopic(createOpts)
+    } catch {

Review comment:
       Hmm.  I don't see why these catch blocks are needed.  All we're doing here is just failing when we get an exception.  But an exception propagating to the top level of the test function fails the test anyway.  What am I missing?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432814420



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -290,42 +299,50 @@ object TopicCommand extends Logging {
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
-      val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
-      val reassignments = listAllReassignments()
-      val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
-      val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
-
-      for (td <- topicDescriptions) {
-        val topicName = td.name
-        val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
-        val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
-
-        if (describeOptions.describeConfigs) {
-          val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
-          if (!opts.reportOverriddenConfigs || hasNonDefault) {
-            val numPartitions = td.partitions().size
-            val firstPartition = td.partitions.iterator.next()
-            val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
-            val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false)
-            topicDesc.printDescription()
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+
+      if (topics.nonEmpty) {
+        val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
+        val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
+        val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
+        val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
+        val topicPartitions = topicDescriptions
+          .flatMap(td => td.partitions.iterator().asScala.map(p => new TopicPartition(td.name(), p.partition())))
+          .toSet.asJava
+        val reassignments = listAllReassignments(topicPartitions)
+
+        for (td <- topicDescriptions) {
+          val topicName = td.name
+          val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
+          val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
+
+          if (describeOptions.describeConfigs) {
+            val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
+            if (!opts.reportOverriddenConfigs || hasNonDefault) {
+              val numPartitions = td.partitions().size
+              val firstPartition = td.partitions.iterator.next()
+              val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
+              val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false)
+              topicDesc.printDescription()
+            }
           }
-        }
 
-        if (describeOptions.describePartitions) {
-          for (partition <- sortedPartitions) {
-            val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition))
-            val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment)
-            describeOptions.maybePrintPartitionDescription(partitionDesc)
+          if (describeOptions.describePartitions) {
+            for (partition <- sortedPartitions) {
+              val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition))
+              val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment)
+              describeOptions.maybePrintPartitionDescription(partitionDesc)
+            }
           }
         }
       }
     }
 
     override def deleteTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
-      adminClient.deleteTopics(topics.asJavaCollection).all().get()
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+      if (topics.nonEmpty)

Review comment:
       Technically yes. Thats why did not have them at first. But they do make rpc calls with an empty topic list. 
   
   Given we discussed efficiency a fair bit in this PR, i think we can have this check for those reasons




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432159364



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -290,6 +295,8 @@ object TopicCommand extends Logging {
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       Same issue here as above.  You can get past this line this way, but you still fail when actually calling `describeTopics`.
   
   Another issue with this code is that we should NOT be listing all partition reassignments.  `Admin#listPartitionReassignments` has a variant that takes a list of partitions.  We should use that so that we're not fetching a lot of information that we don't need.  I think this should be an easy fix




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r431916698



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -228,7 +228,7 @@ object TopicCommand extends Logging {
       if (topic.partitions.exists(partitions => partitions < 1))
         throw new IllegalArgumentException(s"The partitions must be greater than 0")
 
-      if (!adminClient.listTopics().names().get().contains(topic.name)) {
+      try {

Review comment:
       removed this check and instead let the request go to the server and error (we need to handle that anyway) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432159935



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -324,7 +331,7 @@ object TopicCommand extends Logging {
 
     override def deleteTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       Same issue here as above.  `Admin#deleteTopics` will fail if the topic doesn't exist, regardless of line 334.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432172108



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -290,6 +295,8 @@ object TopicCommand extends Logging {
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       I will look into the new API.. so `topics` will be empty and I checked that the describeTopics() will actually send an empty topicList.. Let me look into simplifying this more. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432164756



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -247,8 +247,12 @@ object TopicCommand extends Logging {
         val createResult = adminClient.createTopics(Collections.singleton(newTopic))
         createResult.all().get()
         println(s"Created topic ${topic.name}.")
-      } else {
-        throw new IllegalArgumentException(s"Topic ${topic.name} already exists")
+      } catch {
+        case e : ExecutionException =>
+          if (e.getCause == null)
+            throw e
+          if (!e.getCause.isInstanceOf[TopicExistsException] || !topic.ifTopicDoesntExist())

Review comment:
       yeah.. I can move the `!` outside so its easier to reason.. Personally, I do fine with both, but given we have two similar feedback now. Happy to change. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432162663



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -736,8 +743,8 @@ object TopicCommand extends Logging {
         allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
         allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
-      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt) ++ Set(bootstrapServerOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt) ++ Set(bootstrapServerOpt))

Review comment:
       I think a word got left out here :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432594472



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -259,7 +263,8 @@ object TopicCommand extends Logging {
     override def alterTopic(opts: TopicCommandOptions): Unit = {
       val topic = new CommandTopicPartition(opts)
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       @cmccabe So this is a tad more complicated.. 
   
   In general, we need to do `getTopics` to deal with wildcard/regex arguments (all of these commands support them, even `alter`). To fix this, we need to move the resolution of regex to actual topic names further into AdminClient/Server.. This does not seem trivial to me. Specifically for `alter` we need to pull down the existing partition metadata to add more partitions, so we have a bigger TOCTOU issue here anyway.. Again something that needs better admin client APIs
   
   That said, I do like having an eye towards making all these tools more scalable down the line. I can create a parent JIRA and keep filing these issues under there... (please let me know if a JIRA already exists like that).. 
   
   
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r431976104



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -736,8 +743,8 @@ object TopicCommand extends Logging {
         allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
         allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
-      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt) ++ Set(bootstrapServerOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt) ++ Set(bootstrapServerOpt))

Review comment:
       Sorry... what do you mean by `add a for this`? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r431953983



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -247,8 +247,12 @@ object TopicCommand extends Logging {
         val createResult = adminClient.createTopics(Collections.singleton(newTopic))
         createResult.all().get()
         println(s"Created topic ${topic.name}.")
-      } else {
-        throw new IllegalArgumentException(s"Topic ${topic.name} already exists")
+      } catch {
+        case e : ExecutionException =>
+          if (e.getCause == null)
+            throw e
+          if (!e.getCause.isInstanceOf[TopicExistsException] || !topic.ifTopicDoesntExist())

Review comment:
       nit: I had some trouble reading this, since it's two not operators with an OR statement. Would this be easier to read as `if e.getCause.isInstanceOf[TopicExistsException] && topic.ifTopicDoesntExist()` ? Note that [scala doesn't require a null check on the exception before calling isInstanceOf](https://stackoverflow.com/questions/14203201/does-scala-null-count-as-an-instance-of-another-type)
   
   I think it's more conventional in Scala to have full if/else branches rather than guard statements




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#issuecomment-636260668


   https://issues.apache.org/jira/browse/KAFKA-10071 Opened this to track the follow ups.. @cmccabe let me know if you have further comments on the PR. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432162663



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -736,8 +743,8 @@ object TopicCommand extends Logging {
         allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
         allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
-      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt) ++ Set(bootstrapServerOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt) ++ Set(bootstrapServerOpt))

Review comment:
       @stanislavkozlovski : I think a word got left out here :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#issuecomment-636283732


   Thanks for working on this, @vinothchandar.  I think it's almost ready to go.  I left two comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432166896



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -259,7 +263,8 @@ object TopicCommand extends Logging {
     override def alterTopic(opts: TopicCommandOptions): Unit = {
       val topic = new CommandTopicPartition(opts)
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       if the topic does not exist and , `ensureTopicExists()` will throw an `IllegalArgumentException` right? Let me look at this and the other two closely and get back 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432814353



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -290,42 +299,50 @@ object TopicCommand extends Logging {
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
-      val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
-      val reassignments = listAllReassignments()
-      val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
-      val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
-
-      for (td <- topicDescriptions) {
-        val topicName = td.name
-        val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
-        val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
-
-        if (describeOptions.describeConfigs) {
-          val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
-          if (!opts.reportOverriddenConfigs || hasNonDefault) {
-            val numPartitions = td.partitions().size
-            val firstPartition = td.partitions.iterator.next()
-            val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
-            val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false)
-            topicDesc.printDescription()
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+
+      if (topics.nonEmpty) {
+        val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
+        val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
+        val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
+        val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
+        val topicPartitions = topicDescriptions
+          .flatMap(td => td.partitions.iterator().asScala.map(p => new TopicPartition(td.name(), p.partition())))
+          .toSet.asJava
+        val reassignments = listAllReassignments(topicPartitions)
+
+        for (td <- topicDescriptions) {
+          val topicName = td.name
+          val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
+          val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
+
+          if (describeOptions.describeConfigs) {
+            val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
+            if (!opts.reportOverriddenConfigs || hasNonDefault) {
+              val numPartitions = td.partitions().size
+              val firstPartition = td.partitions.iterator.next()
+              val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
+              val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false)
+              topicDesc.printDescription()
+            }
           }
-        }
 
-        if (describeOptions.describePartitions) {
-          for (partition <- sortedPartitions) {
-            val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition))
-            val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment)
-            describeOptions.maybePrintPartitionDescription(partitionDesc)
+          if (describeOptions.describePartitions) {
+            for (partition <- sortedPartitions) {
+              val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition))
+              val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment)
+              describeOptions.maybePrintPartitionDescription(partitionDesc)
+            }
           }
         }
       }
     }
 
     override def deleteTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
-      adminClient.deleteTopics(topics.asJavaCollection).all().get()
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+      if (topics.nonEmpty)

Review comment:
       It's not necessary to check this.  AdminClient handles being passed empty sets of topics to delete (by doing nothing)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432174885



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -259,7 +263,8 @@ object TopicCommand extends Logging {
     override def alterTopic(opts: TopicCommandOptions): Unit = {
       val topic = new CommandTopicPartition(opts)
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       So this is an execution of `testAlterWhenTopicDoesntExistWithIfExists` altering a non-existing topic using `-if-exists` . As you can see, `topics` is empty.. The cleaner thing to do here seems to be just place any further calls into a `if (topics.nonEmpty) { .. }` block.. 
   
   Is your intention here, avoiding the call to `getTopics()` and `ensureTopicExists()`?  That was the model I was referring to above. if you concern is `describeTopics` failing, then it does not seem to be happening.. (but this code is on thin-ice, agree. if someone changes behavior of describeTopics, then this will break, but so will the test I added.) 
   
   ![image](https://user-images.githubusercontent.com/1179324/83203309-038e0d80-a0fe-11ea-90a7-46abed714aa2.png)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432166675



##########
File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##########
@@ -223,8 +224,20 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     createAndWaitTopic(createOpts)
 
     // try to re-create the topic
-    intercept[IllegalArgumentException] {
+    intercept[TopicExistsException] {
+      topicService.createTopic(createOpts)
+    }
+  }
+
+  @Test
+  def testCreateWhenAlreadyExistsWithIfNotExists(): Unit = {
+    val createOpts = new TopicCommandOptions(Array("--topic", testTopicName, "--if-not-exists"))
+    createAndWaitTopic(createOpts)
+
+    try {
       topicService.createTopic(createOpts)
+    } catch {

Review comment:
       I think the distinction is between whether you want the test to error or fail. Here I wanted the tests to fail with a contextual error by calling `fail()`. Also makes the test read more explicitly.. i.e it says we only treat the specific caught exception as the expected failure by calling `fail()`, while any other exception will just error out the test.. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vinothchandar commented on a change in pull request #8737: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #8737:
URL: https://github.com/apache/kafka/pull/8737#discussion_r432626062



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -324,7 +331,7 @@ object TopicCommand extends Logging {
 
     override def deleteTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)

Review comment:
       Like we discussed.. getTopics will guard the issue you point out.. avoiding `getTopics` is again tricky due to regex/wildcard matching




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org