You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/05 09:36:14 UTC

[GitHub] [kafka] dajac commented on a change in pull request #8598: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

dajac commented on a change in pull request #8598:
URL: https://github.com/apache/kafka/pull/8598#discussion_r419970261



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -245,7 +245,17 @@ object TopicCommand extends Logging {
 
         newTopic.configs(configsMap)
         val createResult = adminClient.createTopics(Collections.singleton(newTopic))
-        createResult.all().get()
+        try {
+          createResult.all().get()
+        } catch {
+          case e: ExecutionException => {
+            val cause = e.getCause
+            if (cause.isInstanceOf[TopicExistsException] || topic.ifTopicDoesntExist()) {

Review comment:
       Shouldn't we re-throw all exceptions except `TopicExistsException` if `topic.ifTopicDoesntExist`?

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -245,7 +245,17 @@ object TopicCommand extends Logging {
 
         newTopic.configs(configsMap)
         val createResult = adminClient.createTopics(Collections.singleton(newTopic))
-        createResult.all().get()
+        try {
+          createResult.all().get()
+        } catch {
+          case e: ExecutionException => {
+            val cause = e.getCause
+            if (cause.isInstanceOf[TopicExistsException] || topic.ifTopicDoesntExist()) {
+              throw e
+            }
+          }
+        }
+
         println(s"Created topic ${topic.name}.")
       } else {
         throw new IllegalArgumentException(s"Topic ${topic.name} already exists")

Review comment:
       Apparently, we already verify the existence of the topic prior to creating it. Should we also handle `--if-not-exists` in this case?

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -257,21 +267,50 @@ object TopicCommand extends Logging {
     }
 
     override def alterTopic(opts: TopicCommandOptions): Unit = {
-      val topic = new CommandTopicPartition(opts)
-      val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
-      val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values()
-      adminClient.createPartitions(topics.map {topicName =>
-        if (topic.hasReplicaAssignment) {
-          val startPartitionId = topicsInfo.get(topicName).get().partitions().size()
-          val newAssignment = {
-            val replicaMap = topic.replicaAssignment.get.drop(startPartitionId)
-            new util.ArrayList(replicaMap.map(p => p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]]
+      if(opts.topicConfig.isDefined || opts.configsToDelete.isDefined) {
+        throw new RuntimeException("Using --config or --delete-config is not supported " +
+          "when altering a topic via the broker API.  Use kafka-configs.sh instead.")

Review comment:
       nit: Extra space after the first `.`.

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -245,7 +245,17 @@ object TopicCommand extends Logging {
 
         newTopic.configs(configsMap)
         val createResult = adminClient.createTopics(Collections.singleton(newTopic))
-        createResult.all().get()
+        try {
+          createResult.all().get()

Review comment:
       nit: the parenthesis are not necessary. there is other cases when they can be removed.

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -257,21 +267,50 @@ object TopicCommand extends Logging {
     }
 
     override def alterTopic(opts: TopicCommandOptions): Unit = {
-      val topic = new CommandTopicPartition(opts)
-      val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
-      val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values()
-      adminClient.createPartitions(topics.map {topicName =>
-        if (topic.hasReplicaAssignment) {
-          val startPartitionId = topicsInfo.get(topicName).get().partitions().size()
-          val newAssignment = {
-            val replicaMap = topic.replicaAssignment.get.drop(startPartitionId)
-            new util.ArrayList(replicaMap.map(p => p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]]
+      if(opts.topicConfig.isDefined || opts.configsToDelete.isDefined) {
+        throw new RuntimeException("Using --config or --delete-config is not supported " +
+          "when altering a topic via the broker API.  Use kafka-configs.sh instead.")
+      }
+      val tp = new CommandTopicPartition(opts)
+      if (tp.hasPartitions) {
+        println("WARNING: If partitions are increased for a topic that has a key, the partition " +
+          "logic or ordering of the messages will be affected")
+        val topicDescription = try {
+          adminClient.describeTopics(Collections.singleton(tp.name)).
+            all().get().get(tp.name)

Review comment:
       nit: unnecessary spaces before `all()`.

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -257,21 +267,50 @@ object TopicCommand extends Logging {
     }
 
     override def alterTopic(opts: TopicCommandOptions): Unit = {
-      val topic = new CommandTopicPartition(opts)
-      val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
-      val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values()
-      adminClient.createPartitions(topics.map {topicName =>
-        if (topic.hasReplicaAssignment) {
-          val startPartitionId = topicsInfo.get(topicName).get().partitions().size()
-          val newAssignment = {
-            val replicaMap = topic.replicaAssignment.get.drop(startPartitionId)
-            new util.ArrayList(replicaMap.map(p => p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]]
+      if(opts.topicConfig.isDefined || opts.configsToDelete.isDefined) {
+        throw new RuntimeException("Using --config or --delete-config is not supported " +
+          "when altering a topic via the broker API.  Use kafka-configs.sh instead.")
+      }
+      val tp = new CommandTopicPartition(opts)
+      if (tp.hasPartitions) {
+        println("WARNING: If partitions are increased for a topic that has a key, the partition " +
+          "logic or ordering of the messages will be affected")
+        val topicDescription = try {
+          adminClient.describeTopics(Collections.singleton(tp.name)).
+            all().get().get(tp.name)
+        } catch {
+          case e: ExecutionException => {
+            val cause = e.getCause
+            if (cause.isInstanceOf[TopicExistsException] && opts.ifExists) {
+              println(s"Ignoring non-existent topic ${tp.name}.")
+              return
+            } else {
+              throw e
+            }
           }
-          topicName -> NewPartitions.increaseTo(topic.partitions.get, newAssignment)
+        }
+        if (topicDescription.partitions().size() == tp.partitions.get) {
+          println(s"Topic ${tp.name} already has ${tp.partitions.get} partitions.  " +

Review comment:
       nit: extra space after the `.`. same below.

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -639,9 +688,9 @@ object TopicCommand extends Logging {
     private val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
       "if set when describing topics, only show topics that have overridden configs")
     private val ifExistsOpt = parser.accepts("if-exists",
-      "if set when altering or deleting or describing topics, the action will only execute if the topic exists. Not supported with the --bootstrap-server option.")
+      "if set when altering or deleting or describing topics, the action will only execute if the topic exists.")

Review comment:
       We need to update `checkArgs` to accept them with `bootstrap-server`. As the moment, the tool refuses them with the following error:
   ```
   Option "[if-not-exists]" can't be used with option "[bootstrap-server]"
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org