You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/23 01:45:26 UTC

[GitHub] [kafka] cmccabe opened a new pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

cmccabe opened a new pull request #10184:
URL: https://github.com/apache/kafka/pull/10184


   Enable the new KIP-500 controller to delete topics.
   
   Fix a bug where feature level records were not correctly replayed.
   
   Fix a bug in TimelineHashMap#remove where the wrong type was being
   returned.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-786277018


   rebased on trunk


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r584162388



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +163,162 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+    val unknownTopicNameErrors = new util.HashMap[String, ApiError]
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.forKeyValue {
+      case (name, idOrError) => if (idOrError.isError) {
+        unknownTopicNameErrors.put(name, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+
+    /**
+     * There are 5 error cases to handle here:
+     *
+     * 1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION
+     * 3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED
+     * 4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID
+     */
+    controller.findTopicNames(topicIdsToResolve).get().asScala.forKeyValue {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        // Case 5: we can't resolve the given topic ID to a name to feed to the
+        // Authorizer.  Return UNKNOWN_TOPIC_ID.
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+    if (!hasClusterAuth) {
+      val allTopicNames = idToName.values().asScala ++ unknownTopicNameErrors.keySet().asScala
+      val authorizedDescribeTopics = getDescribableTopics(allTopicNames)
+      val authorizedDeleteTopics = getDeletableTopics(allTopicNames)
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val topicName = entry.getValue
+        if (!authorizedDeleteTopics.contains(topicName)) {
+          // Case 1 or case 4: the topic exists, but we don't have permission to delete it.
+          val topicId = entry.getKey
+          if (topicIdsToResolve.contains(topicId)) {
+            appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED))

Review comment:
       I think we need a case for describe vs. delete permissions right? Will a client know they have describe vs. delete permissions?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r584161583



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+      case (name, idOrError) => if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+    controller.findTopicNames(topicIdsToResolve).get().asScala.foreach {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+
+    if (!hasClusterAuth) {
+      val authorizedDescribeTopics = getDescribableTopics(idToName.values().asScala)
+      val authorizedDeleteTopics = getDeletableTopics(idToName.values().asScala)
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val topicName = entry.getValue
+        if (!authorizedDeleteTopics.contains(topicName)) {
+          val topicId = entry.getKey
+          if (!authorizedDescribeTopics.contains(topicName)) {
+            // If the user is not authorized to describe the topic, we pretend that it

Review comment:
       For case 4, are we exposing the topic exists by returning a different error in the case where we can't describe?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582256432



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")

Review comment:
       The error message should reflect the problem, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
rondagostino commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-789350896


   ```
   [2021-03-02T22:42:17.438Z] [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10184/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:76:8: Unused import - java.util.Set. [UnusedImports]
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583984728



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {
+        TopicControlInfo topic = topics.remove(record.topicId());
+        if (topic == null) {
+            throw new RuntimeException("Can't find topic with ID " + record.topicId() +
+                " to remove.");
+        }
+        topicsByName.remove(topic.name);
+        log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
+    }

Review comment:
       I guess we haven't hooked up the logic to trigger the deletion of the replicas of the deleted topic in the broker?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {
+        TopicControlInfo topic = topics.remove(record.topicId());
+        if (topic == null) {
+            throw new RuntimeException("Can't find topic with ID " + record.topicId() +
+                " to remove.");
+        }
+        topicsByName.remove(topic.name);

Review comment:
       Should we update brokersToIsrs too?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586979443



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +359,29 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {
+        // Remove this topic from the topics map and the topicsByName map.
+        TopicControlInfo topic = topics.remove(record.topicId());
+        if (topic == null) {
+            throw new RuntimeException("Can't find topic with ID " + record.topicId() +
+                " to remove.");
+        }
+        topicsByName.remove(topic.name);
+
+        // Delete the configurations associated with this topic.
+        configurationControl.deleteTopicConfigs(topic.name);
+
+        // Remove the entries for this topic in brokersToIsrs.
+        for (PartitionControlInfo partition : topic.parts.values()) {
+            for (int i = 0; i < partition.isr.length; i++) {
+                brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
+            }
+        }
+        brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);

Review comment:
       The test case `BrokersToIsrsTest.testNoLeader` suggests that it is a possible case. It looks like the path through `ReplicationControlManager.handleNodeDeactivated` could result in a `PartitionChangeRecord` which has leaderId set to -1. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r584139776



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +163,162 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+    val unknownTopicNameErrors = new util.HashMap[String, ApiError]
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.forKeyValue {
+      case (name, idOrError) => if (idOrError.isError) {
+        unknownTopicNameErrors.put(name, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+
+    /**
+     * There are 5 error cases to handle here:
+     *
+     * 1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION
+     * 3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED
+     * 4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID
+     */
+    controller.findTopicNames(topicIdsToResolve).get().asScala.forKeyValue {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        // Case 5: we can't resolve the given topic ID to a name to feed to the
+        // Authorizer.  Return UNKNOWN_TOPIC_ID.
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+    if (!hasClusterAuth) {
+      val allTopicNames = idToName.values().asScala ++ unknownTopicNameErrors.keySet().asScala
+      val authorizedDescribeTopics = getDescribableTopics(allTopicNames)
+      val authorizedDeleteTopics = getDeletableTopics(allTopicNames)
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val topicName = entry.getValue
+        if (!authorizedDeleteTopics.contains(topicName)) {
+          // Case 1 or case 4: the topic exists, but we don't have permission to delete it.
+          val topicId = entry.getKey
+          if (topicIdsToResolve.contains(topicId)) {
+            appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          } else {
+            appendResponse(topicName, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          }
+          iterator.remove()
+        }
+      }
+      unknownTopicNameErrors.asScala.forKeyValue {
+        case (topicName, error) =>
+          if (authorizedDescribeTopics.contains(topicName)) {
+            // Case 2: the topic we tried to delete by name doesn't exist, and we have
+            // permission to know that.
+            appendResponse(topicName, ZERO_UUID, error)

Review comment:
       this method should be called if `hasClusterAuth` is true.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583450119



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +559,57 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors,
         return configChanges;
     }
 
+    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>();
+        for (String name : names) {
+            if (name == null) {
+                results.put(null, new ResultOrError<>(
+                    new ApiError(INVALID_REQUEST, "Invalid null topic name.")));

Review comment:
       This can be replaced by the constructor `public ResultOrError(Errors error, String message)`

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {

Review comment:
       How about using `DeletableTopicResultCollection` replace `util.List[DeletableTopicResult]`? That change can eliminate extra collection.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {

Review comment:
       Should we remove deleted topic name from `topicsByName` also?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r585895209



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+      case (name, idOrError) => if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+    controller.findTopicNames(topicIdsToResolve).get().asScala.foreach {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+
+    if (!hasClusterAuth) {
+      val authorizedDescribeTopics = getDescribableTopics(idToName.values().asScala)
+      val authorizedDeleteTopics = getDeletableTopics(idToName.values().asScala)
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val topicName = entry.getValue
+        if (!authorizedDeleteTopics.contains(topicName)) {
+          val topicId = entry.getKey
+          if (!authorizedDescribeTopics.contains(topicName)) {
+            // If the user is not authorized to describe the topic, we pretend that it

Review comment:
       Oh sorry this just got changed due to https://issues.apache.org/jira/browse/KAFKA-12394
   So the case of no delete permission, no describe permission, topic ID provided is now TOPIC_AUTHORIZATION_FAILED. This may have been what you had initially.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-787089355


   @cmccabe I pushed a commit that removes a bunch of unnecessary `asScala` conversions (even though these don't copy the collection, they add a shallow allocation and indirection and the code is shorter with these changes - win/win).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583940888



##########
File path: core/src/test/java/kafka/test/MockController.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.controller.ResultOrError;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+
+public class MockController implements Controller {
+    private final static NotControllerException NOT_CONTROLLER_EXCEPTION =
+        new NotControllerException("This is not the correct controller for this cluster.");
+
+    public static class Builder {
+        private final Map<String, MockTopic> initialTopics = new HashMap<>();
+
+        public Builder newInitialTopic(String name, Uuid id) {
+            initialTopics.put(name, new MockTopic(name, id));
+            return this;
+        }
+
+        public MockController build() {
+            return new MockController(initialTopics.values());
+        }
+    }
+
+    private volatile boolean active = true;
+
+    private MockController(Collection<MockTopic> initialTopics) {
+        for (MockTopic topic : initialTopics) {
+            topics.put(topic.id, topic);
+            topicNameToId.put(topic.name, topic.id);
+        }
+    }
+
+    @Override
+    public CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<CreateTopicsResponseData> createTopics(CreateTopicsRequestData request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> unregisterBroker(int brokerId) {
+        throw new UnsupportedOperationException();
+    }
+
+    static class MockTopic {
+        private final String name;
+        private final Uuid id;
+
+        MockTopic(String name, Uuid id) {
+            this.name = name;
+            this.id = id;
+        }
+    }
+
+    private final Map<String, Uuid> topicNameToId = new HashMap<>();
+
+    private final Map<Uuid, MockTopic> topics = new HashMap<>();
+
+    @Override
+    synchronized public CompletableFuture<Map<String, ResultOrError<Uuid>>>
+            findTopicIds(Collection<String> topicNames) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>();
+        for (String topicName : topicNames) {
+            if (!topicNameToId.containsKey(topicName)) {
+                System.out.println("WATERMELON: findTopicIds failed to find " + topicName);

Review comment:
       removed :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582255514



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+    val nameToId = new mutable.HashMap[String, Uuid]
+    deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+      name => nameToId.put(name, Uuid.ZERO_UUID)
+    }
+    deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+      nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+    }
+    val (describable, deletable)  = {
+      if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) {
+        (nameToId.keySet, nameToId.keySet)
+      } else {
+        val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized(

Review comment:
       thanks, this is a good point.  I fixed the code to accept only topic IDs.  I also found a few more bugs, and added a unit test for the deletion code in controller apis.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586739230



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +575,64 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors,
         return configChanges;
     }
 
+    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>();
+        for (String name : names) {
+            if (name == null) {
+                results.put(null, new ResultOrError<>(INVALID_REQUEST, "Invalid null topic name."));
+            } else {
+                Uuid id = topicsByName.get(name, offset);
+                if (id == null) {
+                    results.put(name, new ResultOrError<>(
+                        new ApiError(UNKNOWN_TOPIC_OR_PARTITION)));
+                } else {
+                    results.put(name, new ResultOrError<>(id));
+                }
+            }
+        }
+        return results;
+    }
+
+    Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) {
+        Map<Uuid, ResultOrError<String>> results = new HashMap<>();
+        for (Uuid id : ids) {
+            TopicControlInfo topic = topics.get(id, offset);
+            if (topic == null) {
+                results.put(id, new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID)));
+            } else {
+                results.put(id, new ResultOrError<>(topic.name));
+            }
+        }
+        return results;
+    }
+
+    ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+        Map<Uuid, ApiError> results = new HashMap<>();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Uuid id : ids) {
+            try {
+                deleteTopic(id, records);
+                results.put(id, ApiError.NONE);
+            } catch (ApiException e) {
+                results.put(id, ApiError.fromThrowable(e));
+            } catch (Exception e) {
+                log.error("Unexpected deleteTopics error for {}", id, e);
+                results.put(id, ApiError.fromThrowable(e));
+            }
+        }
+        return new ControllerResult<>(records, results);
+    }
+
+    void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {
+        TopicControlInfo topic = topics.get(id);
+        if (topic == null) {
+            throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message());
+        }
+        configurationControl.deleteTopicConfigs(topic.name);

Review comment:
       you're right, this should happen only in replay, not here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582257154



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +559,41 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors,
         return configChanges;
     }
 
+    ControllerResult<DeleteTopicsResponseData> deleteTopics(Map<String, Uuid> nameToId) {
+        DeleteTopicsResponseData result = new DeleteTopicsResponseData();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Entry<String, Uuid> entry : nameToId.entrySet()) {
+            ApiError error = deleteTopic(entry.getKey(), entry.getValue(), records);
+            result.responses().add(new DeletableTopicResult().
+                setName(entry.getKey()).
+                setTopicId(entry.getValue()).
+                setErrorCode(error.error().code()).
+                setErrorMessage(error.message()));
+        }
+        return new ControllerResult<>(records, result);
+    }
+
+    ApiError deleteTopic(String name,
+                         Uuid providedId,
+                         List<ApiMessageAndVersion> records) {
+        Uuid realId = topicsByName.get(name);
+        if (realId == null) {
+            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
+                "Unable to locate the provided topic name.");
+        }
+        if (!providedId.equals(Uuid.ZERO_UUID) && !providedId.equals(realId)) {
+            return new ApiError(UNKNOWN_TOPIC_ID,
+                "The provided topic ID does not match the provided topic name.");
+        }
+        TopicControlInfo topic = topics.get(realId);
+        if (topic == null) {
+            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "Unable to locate topic id.");

Review comment:
       the code has changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r585895209



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+      case (name, idOrError) => if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+    controller.findTopicNames(topicIdsToResolve).get().asScala.foreach {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+
+    if (!hasClusterAuth) {
+      val authorizedDescribeTopics = getDescribableTopics(idToName.values().asScala)
+      val authorizedDeleteTopics = getDeletableTopics(idToName.values().asScala)
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val topicName = entry.getValue
+        if (!authorizedDeleteTopics.contains(topicName)) {
+          val topicId = entry.getKey
+          if (!authorizedDescribeTopics.contains(topicName)) {
+            // If the user is not authorized to describe the topic, we pretend that it

Review comment:
       @cmccabe Oh sorry this just got changed due to https://issues.apache.org/jira/browse/KAFKA-12394
   So the case of no delete permission, no describe permission, topic ID provided is now TOPIC_AUTHORIZATION_FAILED. This may have been what you had initially.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-789256987


   Thanks for the reviews! I reworked the authentication, validation, and de-duplication code a lot.  The new logic should take into account the issues pointed out here. I resolved a few comment threads since they refer to code that was refactored-- please take another look if you get a chance.
   
   To clarify a bit, `RemoveTopicRecord` should imply some other effects:
   * All topic configs for the affected topic should be deleted
   * We should delete all the partitions of the deleted topic
   * We should remove the topic from `brokersToIsrs`
   
   The fact that it wasn't doing these things was a bug... it's fixed now.  This should also allow the ducktape test to work (cc @rondagostino )
   
   We also have a JIRA to follow up on the broker side: https://issues.apache.org/jira/browse/KAFKA-12403


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582266116



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+    val nameToId = new mutable.HashMap[String, Uuid]
+    deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+      name => nameToId.put(name, Uuid.ZERO_UUID)
+    }
+    deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+      nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+    }
+    val (describable, deletable)  = {
+      if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) {
+        (nameToId.keySet, nameToId.keySet)
+      } else {
+        val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized(
+          request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n)
+        val authorizedDeleteTopics: Set[String] = authHelper.filterByAuthorized(
+          request.context, DELETE, TOPIC, nameToId.keys)(n => n)
+        (authorizedDescribeTopics, authorizedDeleteTopics)
+      }
+    }
+    def sendResponse(response: DeleteTopicsResponseData): Unit = {
+      nameToId.keysIterator.foreach {

Review comment:
       There is no copy when using foreach (both Java or Scala versions). And Scala lambdas are automatically converted to Java lambdas.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r587670430



##########
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##########
@@ -142,6 +148,199 @@ class ControllerApisTest {
       brokerRegistrationResponse.errorCounts().asScala)
   }
 
+  @Test
+  def testDeleteTopicsByName(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val controller = new MockController.Builder().newInitialTopic("foo", fooId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData().setTopicNames(
+      util.Arrays.asList("foo", "bar", "quux", "quux"))
+    val expectedResponse = Set(new DeletableTopicResult().setName("quux").
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic name."),
+      new DeletableTopicResult().setName("bar").
+        setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).
+        setErrorMessage("This server does not host this topic-partition."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId))
+    assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      true,
+      _ => Set.empty,
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testDeleteTopicsById(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val quuxId = Uuid.fromString("ObXkLhL_S5W62FAE67U3MQ")
+    val controller = new MockController.Builder().newInitialTopic("foo", fooId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId))
+    val response = Set(new DeletableTopicResult().setName(null).setTopicId(quuxId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic id."),
+      new DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.UNKNOWN_TOPIC_ID.code()).
+        setErrorMessage("This server does not host this topic ID."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      true,
+      _ => Set.empty,
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testInvalidDeleteTopicsRequest(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val bazId = Uuid.fromString("YOS4oQ3UT9eSAZahN1ysSA")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).
+      newInitialTopic("bar", barId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("foo").setTopicId(fooId))
+    request.topics().add(new DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+    val response = Set(new DeletableTopicResult().setName(null).setTopicId(ZERO_UUID).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Neither topic name nor id were specified."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("You may not specify both topic name and topic id."),
+      new DeletableTopicResult().setName("bar").setTopicId(barId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("The provided topic name maps to an ID that was already supplied."),
+      new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic name."),
+      new DeletableTopicResult().setName(null).setTopicId(bazId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic id."))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      names => names.toSet,
+      names => names.toSet).asScala.toSet)
+  }
+
+  @Test
+  def testNotAuthorizedToDeleteWithTopicExisting(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val bazId = Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")
+    val quuxId = Uuid.fromString("5URoQzW_RJiERVZXJgUVLg")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).
+      newInitialTopic("bar", barId).
+      newInitialTopic("baz", bazId).
+      newInitialTopic("quux", quuxId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new DeleteTopicState().setName("baz").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    val response = Set(new DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("baz").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      _ => Set("foo", "baz"),
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testNotAuthorizedToDeleteWithTopicNotExisting(): Unit = {
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val controller = new MockController.Builder().build()
+    val props = new Properties()
+    props.put(KafkaConfig.DeleteTopicEnableProp, "false")
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    val expectedResponse = Set(new DeletableTopicResult().setName("foo").
+        setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code).
+        setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message),
+      new DeletableTopicResult().setName("bar").
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.UNKNOWN_TOPIC_ID.code).
+        setErrorMessage(Errors.UNKNOWN_TOPIC_ID.message))
+    assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      _ => Set("foo"),
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testNotControllerErrorPreventsDeletingTopics(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).build()
+    controller.setActive(false)

Review comment:
       See `QuorumController.QuorumMetaLogListener` for the callbacks that make the controller active or inactive.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r585891911



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+      case (name, idOrError) => if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+    controller.findTopicNames(topicIdsToResolve).get().asScala.foreach {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+
+    if (!hasClusterAuth) {
+      val authorizedDescribeTopics = getDescribableTopics(idToName.values().asScala)
+      val authorizedDeleteTopics = getDeletableTopics(idToName.values().asScala)
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val topicName = entry.getValue
+        if (!authorizedDeleteTopics.contains(topicName)) {
+          val topicId = entry.getKey
+          if (!authorizedDescribeTopics.contains(topicName)) {
+            // If the user is not authorized to describe the topic, we pretend that it

Review comment:
       You're right, the earlier list was not quite right. I've revised this a bit. It's now:
   
   ```
   no name or id => INVALID_REQUEST
   
   duplicate name or id => INVALID_REQUEST
   
   can't resolve topic id => UNKNOWN_TOPIC_ID
   
   can't locate topic name => UNKNOWN_TOPIC_OR_PARTITION
   
   no delete permission, no describe permission => UNKNOWN_TOPIC_ID (if topic id was provided) or UNKNOWN_TOPIC_OR_PARTITION (if topic name was provided)
   
   no delete permission, describe permission => TOPIC_AUTHORIZATION_FAILED with both name and id filled out correctly
   ```
   
   The new code should implement this correctly...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r584628216



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -280,25 +281,34 @@ class ControllerApis(val requestChannel: RequestChannel,
       while (iterator.hasNext) {
         val entry = iterator.next()
         val topicName = entry.getValue
+        val topicId = entry.getKey
         if (!authorizedDeleteTopics.contains(topicName)) {
-          // Case 1 or case 4: the topic exists, but we don't have permission to delete it.
-          val topicId = entry.getKey
-          if (topicIdsToResolve.contains(topicId)) {
-            appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          if (authorizedDescribeTopics.contains(topicName)) {
+            if (topicNamesToResolve.contains(topicName)) {
+              // 6. name provided, topic exists, describable => TOPIC_AUTHORIZATION_FAILED
+              appendResponse(topicName, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+            } else {
+              // 2. ID provided, topic present, describeable => TOPIC_AUTHORIZATION_FAILED
+              appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+            }
           } else {
-            appendResponse(topicName, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+            if (topicNamesToResolve.contains(topicName)) {
+              // 7. name provided, topic exists, undescribable => UNKNOWN_TOPIC_OR_PARTITION
+              appendResponse(topicName, ZERO_UUID, new ApiError(UNKNOWN_TOPIC_OR_PARTITION))
+            } else {
+              // 3. ID provided, topic present, undescribeable => UNKNOWN_TOPIC_ID
+              appendResponse(null, topicId, new ApiError(UNKNOWN_TOPIC_ID))
+            }
           }
           iterator.remove()
         }
       }
       unknownTopicNameErrors.forEach { (topicName, error) =>

Review comment:
       `unknownTopicNameErrors` need to be handled even if `hasClusterAuth` is true.

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -256,17 +256,18 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
 
     /**
-     * There are 5 error cases to handle here:
+     * There are 6 error cases to handle here if we don't have permission to delete:

Review comment:
       there are "7" cases now.

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -280,25 +281,34 @@ class ControllerApis(val requestChannel: RequestChannel,
       while (iterator.hasNext) {
         val entry = iterator.next()
         val topicName = entry.getValue
+        val topicId = entry.getKey
         if (!authorizedDeleteTopics.contains(topicName)) {
-          // Case 1 or case 4: the topic exists, but we don't have permission to delete it.
-          val topicId = entry.getKey
-          if (topicIdsToResolve.contains(topicId)) {
-            appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          if (authorizedDescribeTopics.contains(topicName)) {
+            if (topicNamesToResolve.contains(topicName)) {
+              // 6. name provided, topic exists, describable => TOPIC_AUTHORIZATION_FAILED
+              appendResponse(topicName, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+            } else {
+              // 2. ID provided, topic present, describeable => TOPIC_AUTHORIZATION_FAILED
+              appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+            }
           } else {
-            appendResponse(topicName, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+            if (topicNamesToResolve.contains(topicName)) {
+              // 7. name provided, topic exists, undescribable => UNKNOWN_TOPIC_OR_PARTITION
+              appendResponse(topicName, ZERO_UUID, new ApiError(UNKNOWN_TOPIC_OR_PARTITION))
+            } else {
+              // 3. ID provided, topic present, undescribeable => UNKNOWN_TOPIC_ID
+              appendResponse(null, topicId, new ApiError(UNKNOWN_TOPIC_ID))
+            }
           }
           iterator.remove()
         }
       }
       unknownTopicNameErrors.forEach { (topicName, error) =>
           if (authorizedDescribeTopics.contains(topicName)) {
-            // Case 2: the topic we tried to delete by name doesn't exist, and we have
-            // permission to know that.
+            // 4. name provided, topic missing, undescribable => UNKNOWN_TOPIC_OR_PARTITION

Review comment:
       this should be case 5 (describable) rather than case 4(undescribable ) since it is in `authorizedDescribeTopics.contains(topicName)` rather than `!authorizedDescribeTopics.contains(topicName)`

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -280,25 +281,34 @@ class ControllerApis(val requestChannel: RequestChannel,
       while (iterator.hasNext) {
         val entry = iterator.next()
         val topicName = entry.getValue
+        val topicId = entry.getKey
         if (!authorizedDeleteTopics.contains(topicName)) {
-          // Case 1 or case 4: the topic exists, but we don't have permission to delete it.
-          val topicId = entry.getKey
-          if (topicIdsToResolve.contains(topicId)) {
-            appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          if (authorizedDescribeTopics.contains(topicName)) {
+            if (topicNamesToResolve.contains(topicName)) {
+              // 6. name provided, topic exists, describable => TOPIC_AUTHORIZATION_FAILED
+              appendResponse(topicName, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+            } else {
+              // 2. ID provided, topic present, describeable => TOPIC_AUTHORIZATION_FAILED
+              appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED))

Review comment:
       As it is describeable, is it necessary to set null name?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-790862249


   Thanks for reviews. I will merge this on behalf of @cmccabe to trunk and 2.8.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583084940



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +201,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve(_))
+
+    request.topics().iterator().asScala.foreach {
+      case topic => if (topic.name() == null) {

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r581122071



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+    val nameToId = new mutable.HashMap[String, Uuid]
+    deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {

Review comment:
       You can do `forEach` on the Java collection directly, right? A few other cases like that below.

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+    val nameToId = new mutable.HashMap[String, Uuid]
+    deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+      name => nameToId.put(name, Uuid.ZERO_UUID)
+    }
+    deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+      nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+    }
+    val (describable, deletable)  = {
+      if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) {
+        (nameToId.keySet, nameToId.keySet)
+      } else {
+        val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized(
+          request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n)
+        val authorizedDeleteTopics: Set[String] = authHelper.filterByAuthorized(
+          request.context, DELETE, TOPIC, nameToId.keys)(n => n)
+        (authorizedDescribeTopics, authorizedDeleteTopics)
+      }
+    }
+    def sendResponse(response: DeleteTopicsResponseData): Unit = {
+      nameToId.keysIterator.foreach {

Review comment:
       It's a bit odd to use iterators when calling `foreach` after. `iterator` is typically useful when you want to do `map` or other such operations lazily. `foreach` forces the iteration anyway.

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+    val nameToId = new mutable.HashMap[String, Uuid]

Review comment:
       Nit: usually you would do `mutable.Map[String, Uuid]()` instead of using `HashMap` directly.

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+    val nameToId = new mutable.HashMap[String, Uuid]
+    deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+      name => nameToId.put(name, Uuid.ZERO_UUID)
+    }
+    deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+      nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+    }
+    val (describable, deletable)  = {
+      if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) {
+        (nameToId.keySet, nameToId.keySet)
+      } else {
+        val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized(
+          request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n)
+        val authorizedDeleteTopics: Set[String] = authHelper.filterByAuthorized(
+          request.context, DELETE, TOPIC, nameToId.keys)(n => n)
+        (authorizedDescribeTopics, authorizedDeleteTopics)
+      }
+    }
+    def sendResponse(response: DeleteTopicsResponseData): Unit = {
+      nameToId.keysIterator.foreach {
+        name => if (!deletable.contains(name)) {
+          val result = if (describable.contains(name)) {
+            new DeletableTopicResult().setName(name).setErrorCode(TOPIC_AUTHORIZATION_FAILED.code)
+          } else {
+            new DeletableTopicResult().setName(name).setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code)
+          }
+          response.responses().add(result)
+        }
+      }
+      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+        response.setThrottleTimeMs(throttleTimeMs)
+        new DeleteTopicsResponse(response)
+      })
+    }
+   val future = controller.deleteTopics(
+     nameToId.view.filterKeys(deletable.contains(_)).toMap.asJava)

Review comment:
       I would use `filter` and remove `view` and `toMap`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r584007497



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {
+        TopicControlInfo topic = topics.remove(record.topicId());
+        if (topic == null) {
+            throw new RuntimeException("Can't find topic with ID " + record.topicId() +
+                " to remove.");
+        }
+        topicsByName.remove(topic.name);

Review comment:
       The ISRs should already have been updated by `BrokerChangeRecords` that were previously replayed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r585942960



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -809,6 +824,27 @@ private QuorumController(LogContext logContext,
             () -> replicationControl.unregisterBroker(brokerId));
     }
 
+    @Override
+    public CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(Collection<String> names) {
+        if (names.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendReadEvent("findTopicIds",
+            () -> replicationControl.findTopicIds(lastCommittedOffset, names));
+    }
+
+    @Override
+    public CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(Collection<Uuid> ids) {
+        if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendReadEvent("findTopicNames",
+            () -> replicationControl.findTopicNames(lastCommittedOffset, ids));
+    }
+
+    @Override
+    public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+        if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendWriteEvent("deleteTopics",
+            () -> replicationControl.deleteTopics(ids));

Review comment:
       Yes, it should be implicit based on the DeleteTopic record. I will fix the controller to do the right thing here.  We'll also need to have the broker do that too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r587131740



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +359,29 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {
+        // Remove this topic from the topics map and the topicsByName map.
+        TopicControlInfo topic = topics.remove(record.topicId());
+        if (topic == null) {
+            throw new RuntimeException("Can't find topic with ID " + record.topicId() +
+                " to remove.");
+        }
+        topicsByName.remove(topic.name);
+
+        // Delete the configurations associated with this topic.
+        configurationControl.deleteTopicConfigs(topic.name);
+
+        // Remove the entries for this topic in brokersToIsrs.
+        for (PartitionControlInfo partition : topic.parts.values()) {
+            for (int i = 0; i < partition.isr.length; i++) {
+                brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
+            }
+        }
+        brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);

Review comment:
       Got it. The following comment confirmed this.
   
   ```
       /**
        * A map of broker IDs to the partitions that the broker is in the ISR for.
        * Partitions with no isr members appear in this map under id NO_LEADER.
        */
       private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> isrMembers;
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583962844



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+      case (name, idOrError) => if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error())

Review comment:
       It's certainly awkward that for topic IDs, we need to check existence first (since otherwise we have nothing to give to the authorizer) but for topic names, we check authorization first. But you're right, this is a leak. I'll fix it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582599257



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +201,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve(_))
+
+    request.topics().iterator().asScala.foreach {
+      case topic => if (topic.name() == null) {

Review comment:
       the default value of `topic.name` is empty string so does it need to check empty string also?

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -18,36 +18,42 @@
 package kafka.server
 
 import java.util
+import java.util.Collections
+import java.util.concurrent.ExecutionException
 
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.Logging
 import org.apache.kafka.clients.admin.AlterConfigOp
-import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DESCRIBE}
+import org.apache.kafka.common.Uuid.ZERO_UUID
+import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE}
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.ApiException
+import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, TopicDeletionDisabledException}
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.message.ApiVersionsResponseData.{ApiVersion, SupportedFeatureKey}
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
+import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
-import org.apache.kafka.common.message.{ApiVersionsResponseData, BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, UnregisterBrokerResponseData, VoteResponseData}
+import org.apache.kafka.common.message.{ApiVersionsResponseData, BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DeleteTopicsRequestData, DeleteTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, UnregisterBrokerResponseData, VoteResponseData}
+import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, TOPIC_AUTHORIZATION_FAILED, UNKNOWN_TOPIC_ID, UNKNOWN_TOPIC_OR_PARTITION}
 import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
 import org.apache.kafka.common.record.BaseRecords
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.Resource
 import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
 import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.Node
+import org.apache.kafka.common.{Node, Uuid}
 import org.apache.kafka.controller.Controller
 import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, FeatureMap, FeatureMapAndEpoch, VersionRange}
 import org.apache.kafka.server.authorizer.Authorizer
 
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
+

Review comment:
       unintentional?

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +201,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve(_))

Review comment:
       `foreach(maybeAppendToTopicNamesToResolve(_))` -> `foreach(maybeAppendToTopicNamesToResolve)`

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +201,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve(_))
+
+    request.topics().iterator().asScala.foreach {
+      case topic => if (topic.name() == null) {

Review comment:
       `case` is redundant

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +559,57 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors,
         return configChanges;
     }
 
+    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>();
+        for (String name : names) {
+            if (name == null) {
+                results.put(null, new ResultOrError<>(
+                    new ApiError(INVALID_REQUEST, "Invalid null topic name.")));
+            } else {
+                Uuid id = topicsByName.get(name, offset);
+                if (id == null) {
+                    results.put(null, new ResultOrError<>(

Review comment:
       Why it pass `null` as topic name?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/Controller.java
##########
@@ -69,6 +70,31 @@
      */
     CompletableFuture<Void> unregisterBroker(int brokerId);
 
+    /**
+     * Find the ids for topic names.
+     *
+     * @param topicNames    The topic names to resolve.
+     * @return              A future yielding a map from topic name to id.
+     */
+    CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(Collection<String> topicNames);
+
+    /**
+     * Find the names for topic ids.
+     *
+     * @param topicIds      The topic ids to resolve.
+     * @return              A future yielding a map from topic name to id.

Review comment:
       Should it be `id to topic name`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r580781460



##########
File path: config/log4j.properties
##########
@@ -61,11 +61,11 @@ log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 log4j.logger.org.apache.zookeeper=INFO
 
 # Change the two lines below to adjust the general broker logging level (output to server.log and stdout)
-log4j.logger.kafka=INFO
-log4j.logger.org.apache.kafka=INFO
+log4j.logger.kafka=TRACE
+log4j.logger.org.apache.kafka=TRACE
 
 # Change to DEBUG or TRACE to enable request logging
-log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.logger.kafka.request.logger=TRACE, requestAppender

Review comment:
       Are these accidental?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583886777



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,

Review comment:
       There seems to be enough complexity in the handling here that it might be worth pulling this logic into a separate class. Not required for this PR, but it would be nice to come up with a nicer pattern so that we don't end up with a giant class like `KafkaApis`.

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+      case (name, idOrError) => if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error())

Review comment:
       If the controller does not have the topic id mapping, then the error here will be `UNKNOWN_TOPIC_OR_PARTITION`. As far as I can tell, this would get returned in the response to the client. This behavior differs from the handling logic in `KafkaApis` where we always check authorization first. The problem is that this implicitly leaks topic existence.

##########
File path: core/src/test/java/kafka/test/MockController.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.controller.ResultOrError;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+
+public class MockController implements Controller {
+    private final static NotControllerException NOT_CONTROLLER_EXCEPTION =
+        new NotControllerException("This is not the correct controller for this cluster.");
+
+    public static class Builder {
+        private final Map<String, MockTopic> initialTopics = new HashMap<>();
+
+        public Builder newInitialTopic(String name, Uuid id) {
+            initialTopics.put(name, new MockTopic(name, id));
+            return this;
+        }
+
+        public MockController build() {
+            return new MockController(initialTopics.values());
+        }
+    }
+
+    private volatile boolean active = true;
+
+    private MockController(Collection<MockTopic> initialTopics) {
+        for (MockTopic topic : initialTopics) {
+            topics.put(topic.id, topic);
+            topicNameToId.put(topic.name, topic.id);
+        }
+    }
+
+    @Override
+    public CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<CreateTopicsResponseData> createTopics(CreateTopicsRequestData request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> unregisterBroker(int brokerId) {
+        throw new UnsupportedOperationException();
+    }
+
+    static class MockTopic {
+        private final String name;
+        private final Uuid id;
+
+        MockTopic(String name, Uuid id) {
+            this.name = name;
+            this.id = id;
+        }
+    }
+
+    private final Map<String, Uuid> topicNameToId = new HashMap<>();
+
+    private final Map<Uuid, MockTopic> topics = new HashMap<>();
+
+    @Override
+    synchronized public CompletableFuture<Map<String, ResultOrError<Uuid>>>
+            findTopicIds(Collection<String> topicNames) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>();
+        for (String topicName : topicNames) {
+            if (!topicNameToId.containsKey(topicName)) {
+                System.out.println("WATERMELON: findTopicIds failed to find " + topicName);

Review comment:
       Guessing you aren't planning to commit this 😉 .

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {

Review comment:
       nit: can you use `Implicits.forKeyValue`?

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+      case (name, idOrError) => if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+    controller.findTopicNames(topicIdsToResolve).get().asScala.foreach {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+
+    if (!hasClusterAuth) {
+      val authorizedDescribeTopics = getDescribableTopics(idToName.values().asScala)
+      val authorizedDeleteTopics = getDeletableTopics(idToName.values().asScala)
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val topicName = entry.getValue
+        if (!authorizedDeleteTopics.contains(topicName)) {
+          val topicId = entry.getKey
+          if (!authorizedDescribeTopics.contains(topicName)) {
+            // If the user is not authorized to describe the topic, we pretend that it

Review comment:
       The error should be `TOPIC_AUTHORIZATION_FAILED` if the client does not have describe permission regardless of existence.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r580825461



##########
File path: config/log4j.properties
##########
@@ -61,11 +61,11 @@ log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 log4j.logger.org.apache.zookeeper=INFO
 
 # Change the two lines below to adjust the general broker logging level (output to server.log and stdout)
-log4j.logger.kafka=INFO
-log4j.logger.org.apache.kafka=INFO
+log4j.logger.kafka=TRACE
+log4j.logger.org.apache.kafka=TRACE
 
 # Change to DEBUG or TRACE to enable request logging
-log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.logger.kafka.request.logger=TRACE, requestAppender

Review comment:
       yes, these are accidental.  will revert.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r584009213



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {
+        TopicControlInfo topic = topics.remove(record.topicId());
+        if (topic == null) {
+            throw new RuntimeException("Can't find topic with ID " + record.topicId() +
+                " to remove.");
+        }
+        topicsByName.remove(topic.name);
+        log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
+    }

Review comment:
       We haven't hooked that up yet, correct.  But that logic is in `BrokerMetadataListener`. It would probably be better to have a separate PR for that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r584019199



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {
+        TopicControlInfo topic = topics.remove(record.topicId());
+        if (topic == null) {
+            throw new RuntimeException("Can't find topic with ID " + record.topicId() +
+                " to remove.");
+        }
+        topicsByName.remove(topic.name);

Review comment:
       Hmm, you mean PartitionChangeRecord? I don't see PartitionChangeRecord being generated from the topicDeletion request.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -809,6 +824,27 @@ private QuorumController(LogContext logContext,
             () -> replicationControl.unregisterBroker(brokerId));
     }
 
+    @Override
+    public CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(Collection<String> names) {
+        if (names.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendReadEvent("findTopicIds",
+            () -> replicationControl.findTopicIds(lastCommittedOffset, names));
+    }
+
+    @Override
+    public CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(Collection<Uuid> ids) {
+        if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendReadEvent("findTopicNames",
+            () -> replicationControl.findTopicNames(lastCommittedOffset, ids));
+    }
+
+    @Override
+    public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+        if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendWriteEvent("deleteTopics",
+            () -> replicationControl.deleteTopics(ids));

Review comment:
       We also need to delete the configuration associated with the topic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r584161583



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+      case (name, idOrError) => if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+    controller.findTopicNames(topicIdsToResolve).get().asScala.foreach {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+
+    if (!hasClusterAuth) {
+      val authorizedDescribeTopics = getDescribableTopics(idToName.values().asScala)
+      val authorizedDeleteTopics = getDeletableTopics(idToName.values().asScala)
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val topicName = entry.getValue
+        if (!authorizedDeleteTopics.contains(topicName)) {
+          val topicId = entry.getKey
+          if (!authorizedDescribeTopics.contains(topicName)) {
+            // If the user is not authorized to describe the topic, we pretend that it

Review comment:
       For case 4, are we exposing the topic exists by returning a different error (than case 5) in the case where we can't describe?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
rondagostino commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-789243745


   The below, if added as `tests/kafkatest/sanity_checks/test_delete_topic.py`, fails for the Raft cases on this PR branch as of this moment because the broker fails to shutdown.  The following appears in the controller log:
   
   ```
   [2021-03-02 21:41:13,354] INFO [Controller 1] Unfenced broker 1 has requested and been granted a controlled shutdown. (org.apache.kafka.controller.BrokerHeartbeatManager)
   [2021-03-02 21:41:13,355] WARN [Controller 1] org.apache.kafka.controller.QuorumController@3fa533f1: failed with unknown server exception RuntimeException at epoch 1 in 802 us.  Reverting to last committed offset 5. (org.apache.kafka.controller.QuorumController)
   java.lang.RuntimeException: Topic ID VnD54LHq2t3qq_m1WLasZg existed in isrMembers, but not in the topics map.
   	at org.apache.kafka.controller.ReplicationControlManager.handleNodeDeactivated(ReplicationControlManager.java:752)
   	at org.apache.kafka.controller.ReplicationControlManager.processBrokerHeartbeat(ReplicationControlManager.java:931)
   	at org.apache.kafka.controller.QuorumController$1.generateRecordsAndResult(QuorumController.java:911)
   	at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:419)
   	at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
   	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
   	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
   	at java.lang.Thread.run(Thread.java:748)
   ```
   
   Maybe add this system test to this PR as `tests/kafkatest/sanity_checks/test_delete_topic.py`?
   
   ```
   # Licensed to the Apache Software Foundation (ASF) under one or more
   # contributor license agreements.  See the NOTICE file distributed with
   # this work for additional information regarding copyright ownership.
   # The ASF licenses this file to You under the Apache License, Version 2.0
   # (the "License"); you may not use this file except in compliance with
   # the License.  You may obtain a copy of the License at
   #
   #    http://www.apache.org/licenses/LICENSE-2.0
   #
   # Unless required by applicable law or agreed to in writing, software
   # distributed under the License is distributed on an "AS IS" BASIS,
   # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   # See the License for the specific language governing permissions and
   # limitations under the License.
   
   
   from ducktape.mark import matrix
   from ducktape.mark.resource import cluster
   from ducktape.tests.test import Test
   
   from kafkatest.services.kafka import KafkaService, quorum
   from kafkatest.services.zookeeper import ZookeeperService
   import time
   
   class TestDeleteTopic(Test):
       """Sanity checks that we can create and delete a topic and then shutdown."""
       def __init__(self, test_context):
           super(TestDeleteTopic, self).__init__(test_context)
   
           self.topic = "test_topic"
           self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
           self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
                                     topics={self.topic: {"partitions": 1, "replication-factor": 1}},
                                     controller_num_nodes_override=1)
       def setUp(self):
           if self.zk:
               self.zk.start()
   
       @cluster(num_nodes=2)
       @matrix(metadata_quorum=quorum.all)
       def test_delete_topic(self, metadata_quorum):
           """
           Test that we can create and delete a topic and then shutdown
           """
           self.kafka.start()
           self.kafka.delete_topic(self.topic)
           time.sleep(10) # give it a bit to take effect
           self.kafka.stop() # explicit stop so that failure to stop fails the test
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r585086425



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,17 +162,179 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data,
+      request.context.apiVersion,
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator)).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error.code).
+        setErrorMessage(error.message))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames.forEach(maybeAppendToTopicNamesToResolve)
+
+    request.topics.forEach {
+      topic => if (topic.name == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId)
+        }
+      } else {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name)
+        } else {
+          appendResponse(topic.name, topic.topicId, new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+    val unknownTopicNameErrors = new util.HashMap[String, ApiError]
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().forEach { (name, idOrError) =>
+      if (idOrError.isError)
+        unknownTopicNameErrors.put(name, idOrError.error)
+      else
+        maybeAppendToIdToName(idOrError.result, name)
+    }
+
+    /**
+     * There are 6 error cases to handle here if we don't have permission to delete:
+     *
+     * 1. ID provided, topic missing => UNKNOWN_TOPIC_ID
+     * 2. ID provided, topic present, describeable => TOPIC_AUTHORIZATION_FAILED
+     * 3. ID provided, topic present, undescribeable => UNKNOWN_TOPIC_ID
+     * 4. name provided, topic missing, undescribable => UNKNOWN_TOPIC_OR_PARTITION

Review comment:
       These cases seem wrong. It should be the following:
   ```
        * 4. name provided, topic missing, undescribable => TOPIC_AUTHORIZATION_FAILED
        * 5. name provided, topic missing, describable => UNKNOWN_TOPIC_OR_PARTITION
        * 6. name provided, topic exists, undescribable => TOPIC_AUTHORIZATION_FAILED
   ```
   If the client does not have describe permission, then it should get `TOPIC_AUTHORIZATION_FAILED` regardless of existence.     

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -809,6 +824,27 @@ private QuorumController(LogContext logContext,
             () -> replicationControl.unregisterBroker(brokerId));
     }
 
+    @Override
+    public CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(Collection<String> names) {
+        if (names.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendReadEvent("findTopicIds",
+            () -> replicationControl.findTopicIds(lastCommittedOffset, names));
+    }
+
+    @Override
+    public CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(Collection<Uuid> ids) {
+        if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendReadEvent("findTopicNames",
+            () -> replicationControl.findTopicNames(lastCommittedOffset, ids));
+    }
+
+    @Override
+    public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+        if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendWriteEvent("deleteTopics",
+            () -> replicationControl.deleteTopics(ids));

Review comment:
       Is the deletion of topic configurations implicit based on the DeleteTopic record? I know we discussed this, but I'm unsure what the final outcome was. I don't see any logic for this in the broker listener, but the implementation looks incomplete anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
rondagostino commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-784198059


   Successfully ran the system test that was previously failing due to an inability to delete topics:
   
   `TC_PATHS="tests/kafkatest/tests/core/replica_scale_test.py::ReplicaScaleTest.test_clean_bounce" bash tests/docker/run_tests.sh`
   ```
   test_id:    kafkatest.tests.core.replica_scale_test.ReplicaScaleTest.test_clean_bounce.topic_count=50.partition_count=34.replication_factor=3.metadata_quorum=REMOTE_RAFT
   status:     PASS
   run time:   8 minutes 17.307 seconds
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586990075



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +359,29 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {
+        // Remove this topic from the topics map and the topicsByName map.
+        TopicControlInfo topic = topics.remove(record.topicId());
+        if (topic == null) {
+            throw new RuntimeException("Can't find topic with ID " + record.topicId() +
+                " to remove.");
+        }
+        topicsByName.remove(topic.name);
+
+        // Delete the configurations associated with this topic.
+        configurationControl.deleteTopicConfigs(topic.name);
+
+        // Remove the entries for this topic in brokersToIsrs.
+        for (PartitionControlInfo partition : topic.parts.values()) {
+            for (int i = 0; i < partition.isr.length; i++) {
+                brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
+            }
+        }
+        brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);

Review comment:
       It's true and a partition could have isr and no leader. However, in that case, `isrMembers` in brokersToIsrs will still be updated with key from replicaId in isr and isr will never have -1 in its list. The noLeader info is only stored in the value of `isrMembers`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586131042



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,17 +162,164 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data,
+      request.context.apiVersion,
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator)).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    // Check if topic deletion is enabled at all.
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    // The first step is to load up the names and IDs that have been provided by the
+    // request.  This is a bit messy because we support multiple ways of referring to
+    // topics (both by name and by id) and because we need to check for duplicates or
+    // other invalid inputs.
+    val responses = new util.ArrayList[DeletableTopicResult]
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error.code).
+        setErrorMessage(error.message))
+    }
+    val providedNames = new util.HashSet[String]
+    val duplicateProvidedNames = new util.HashSet[String]
+    val providedIds = new util.HashSet[Uuid]
+    val duplicateProvidedIds = new util.HashSet[Uuid]
+    def addProvidedName(name: String): Unit = {
+      if (duplicateProvidedNames.contains(name) || !providedNames.add(name)) {
+        duplicateProvidedNames.add(name)
+        providedNames.remove(name)
+      }
+    }
+    request.topicNames.forEach(addProvidedName)
+    request.topics.forEach {
+      topic => if (topic.name == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else if (duplicateProvidedIds.contains(topic.topicId) || !providedIds.add(topic.topicId)) {
+          duplicateProvidedIds.add(topic.topicId)
+          providedIds.remove(topic.topicId)
+        }
+      } else {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          addProvidedName(topic.name)
+        } else {
+          appendResponse(topic.name, topic.topicId, new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+    // Create error responses for duplicates.
+    duplicateProvidedNames.forEach(name => appendResponse(name, ZERO_UUID,
+      new ApiError(INVALID_REQUEST, "Duplicate topic name.")))
+    duplicateProvidedIds.forEach(id => appendResponse(null, id,
+      new ApiError(INVALID_REQUEST, "Duplicate topic id.")))
+    // At this point we have all the valid names and IDs that have been provided.
+    // However, the Authorizer needs topic names as inputs, not topic IDs.  So
+    // we need to resolve all IDs to names.
+    val toAuthenticate = new util.HashSet[String]
+    toAuthenticate.addAll(providedNames)
+    val idToName = new util.HashMap[Uuid, String]
+    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
+      if (nameOrError.isError) {
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        toAuthenticate.add(nameOrError.result())
+        idToName.put(id, nameOrError.result())
+      }
+    }
+    // Get the list of deletable topics (those we can delete) and the list of describeable
+    // topics.  If a topic can't be deleted or described, we have to act like it doesn't
+    // exist, even when it does.
+    val topicsToAuthenticate = toAuthenticate.asScala
+    val (describeable, deletable) = if (hasClusterAuth) {
+      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
+    } else {
+      (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
+    }
+    // For each topic that was provided by ID, check if authentication failed.
+    // If so, remove it from the idToName map and create an error response for it.
+    val iterator = idToName.entrySet().iterator()
+    while (iterator.hasNext) {
+      val entry = iterator.next()
+      val id = entry.getKey
+      val name = entry.getValue
+      if (!deletable.contains(name)) {
+        if (describeable.contains(name)) {
+          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+        } else {
+          appendResponse(null, id, new ApiError(UNKNOWN_TOPIC_ID))

Review comment:
       It reaches consensus on  #10223 that the error should be `TOPIC_AUTHORIZATION_FAILED` rather than `UNKNOWN_TOPIC_ID`
   
   see `KafkaApis` (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1901)

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,17 +162,164 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data,
+      request.context.apiVersion,
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator)).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    // Check if topic deletion is enabled at all.
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    // The first step is to load up the names and IDs that have been provided by the
+    // request.  This is a bit messy because we support multiple ways of referring to
+    // topics (both by name and by id) and because we need to check for duplicates or
+    // other invalid inputs.
+    val responses = new util.ArrayList[DeletableTopicResult]
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error.code).
+        setErrorMessage(error.message))
+    }
+    val providedNames = new util.HashSet[String]
+    val duplicateProvidedNames = new util.HashSet[String]
+    val providedIds = new util.HashSet[Uuid]
+    val duplicateProvidedIds = new util.HashSet[Uuid]
+    def addProvidedName(name: String): Unit = {
+      if (duplicateProvidedNames.contains(name) || !providedNames.add(name)) {
+        duplicateProvidedNames.add(name)
+        providedNames.remove(name)
+      }
+    }
+    request.topicNames.forEach(addProvidedName)
+    request.topics.forEach {
+      topic => if (topic.name == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else if (duplicateProvidedIds.contains(topic.topicId) || !providedIds.add(topic.topicId)) {
+          duplicateProvidedIds.add(topic.topicId)
+          providedIds.remove(topic.topicId)
+        }
+      } else {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          addProvidedName(topic.name)
+        } else {
+          appendResponse(topic.name, topic.topicId, new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+    // Create error responses for duplicates.
+    duplicateProvidedNames.forEach(name => appendResponse(name, ZERO_UUID,
+      new ApiError(INVALID_REQUEST, "Duplicate topic name.")))
+    duplicateProvidedIds.forEach(id => appendResponse(null, id,
+      new ApiError(INVALID_REQUEST, "Duplicate topic id.")))
+    // At this point we have all the valid names and IDs that have been provided.
+    // However, the Authorizer needs topic names as inputs, not topic IDs.  So
+    // we need to resolve all IDs to names.
+    val toAuthenticate = new util.HashSet[String]
+    toAuthenticate.addAll(providedNames)
+    val idToName = new util.HashMap[Uuid, String]
+    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
+      if (nameOrError.isError) {
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        toAuthenticate.add(nameOrError.result())
+        idToName.put(id, nameOrError.result())
+      }
+    }
+    // Get the list of deletable topics (those we can delete) and the list of describeable
+    // topics.  If a topic can't be deleted or described, we have to act like it doesn't
+    // exist, even when it does.
+    val topicsToAuthenticate = toAuthenticate.asScala
+    val (describeable, deletable) = if (hasClusterAuth) {
+      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
+    } else {
+      (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
+    }
+    // For each topic that was provided by ID, check if authentication failed.
+    // If so, remove it from the idToName map and create an error response for it.
+    val iterator = idToName.entrySet().iterator()
+    while (iterator.hasNext) {
+      val entry = iterator.next()
+      val id = entry.getKey
+      val name = entry.getValue
+      if (!deletable.contains(name)) {
+        if (describeable.contains(name)) {
+          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+        } else {
+          appendResponse(null, id, new ApiError(UNKNOWN_TOPIC_ID))
+        }
+        iterator.remove()
+      }
+    }
+    // For each topic that was provided by name, check if authentication failed.
+    // If so, create an error response for it.  Otherwise, add it to the idToName map.
+    controller.findTopicIds(providedNames).get().forEach { (name, idOrError) =>
+      if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error)
+      } else if (deletable.contains(name)) {
+        val id = idOrError.result()
+        if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != null) {
+          // This is kind of a weird case: what if we supply topic ID X and also a name
+          // that maps to ID X?  In that case, _if authorization succeeds_, we end up
+          // here.  If authorization doesn't succeed, we refrain from commenting on the
+          // situation since it would reveal topic ID mappings.
+          duplicateProvidedIds.add(id)
+          idToName.remove(id)
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+            "The provided topic name maps to an ID that was already supplied."))
+        }
+      } else if (describeable.contains(name)) {
+        appendResponse(name, idOrError.result(), new ApiError(TOPIC_AUTHORIZATION_FAILED))
+      } else {
+        appendResponse(name, ZERO_UUID, new ApiError(UNKNOWN_TOPIC_OR_PARTITION))

Review comment:
       topic id is NOT sensitive (see discussion https://issues.apache.org/jira/browse/KAFKA-12369) so it is ok to return topic id. Also, the error code should be `TOPIC_AUTHORIZATION_FAILED` as it can produce quick failure (`UNKNOWN_TOPIC_OR_PARTITION ` is a retryable error).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r587002273



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +359,29 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {
+        // Remove this topic from the topics map and the topicsByName map.
+        TopicControlInfo topic = topics.remove(record.topicId());
+        if (topic == null) {
+            throw new RuntimeException("Can't find topic with ID " + record.topicId() +
+                " to remove.");
+        }
+        topicsByName.remove(topic.name);
+
+        // Delete the configurations associated with this topic.
+        configurationControl.deleteTopicConfigs(topic.name);
+
+        // Remove the entries for this topic in brokersToIsrs.
+        for (PartitionControlInfo partition : topic.parts.values()) {
+            for (int i = 0; i < partition.isr.length; i++) {
+                brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
+            }
+        }
+        brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);

Review comment:
       I stepped through `testNoLeader` and it seems that -1 can indeed be a key in `isrMembers`. The `noLeaderIterator` makes the expectation explicit. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-786461247


   rebased on trunk


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582257154



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +559,41 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors,
         return configChanges;
     }
 
+    ControllerResult<DeleteTopicsResponseData> deleteTopics(Map<String, Uuid> nameToId) {
+        DeleteTopicsResponseData result = new DeleteTopicsResponseData();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Entry<String, Uuid> entry : nameToId.entrySet()) {
+            ApiError error = deleteTopic(entry.getKey(), entry.getValue(), records);
+            result.responses().add(new DeletableTopicResult().
+                setName(entry.getKey()).
+                setTopicId(entry.getValue()).
+                setErrorCode(error.error().code()).
+                setErrorMessage(error.message()));
+        }
+        return new ControllerResult<>(records, result);
+    }
+
+    ApiError deleteTopic(String name,
+                         Uuid providedId,
+                         List<ApiMessageAndVersion> records) {
+        Uuid realId = topicsByName.get(name);
+        if (realId == null) {
+            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
+                "Unable to locate the provided topic name.");
+        }
+        if (!providedId.equals(Uuid.ZERO_UUID) && !providedId.equals(realId)) {
+            return new ApiError(UNKNOWN_TOPIC_ID,
+                "The provided topic ID does not match the provided topic name.");
+        }
+        TopicControlInfo topic = topics.get(realId);
+        if (topic == null) {
+            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "Unable to locate topic id.");

Review comment:
       this is out of date




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r584139113



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +163,162 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+    val unknownTopicNameErrors = new util.HashMap[String, ApiError]
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.forKeyValue {
+      case (name, idOrError) => if (idOrError.isError) {
+        unknownTopicNameErrors.put(name, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+
+    /**
+     * There are 5 error cases to handle here:
+     *
+     * 1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION
+     * 3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED
+     * 4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID
+     */
+    controller.findTopicNames(topicIdsToResolve).get().asScala.forKeyValue {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        // Case 5: we can't resolve the given topic ID to a name to feed to the
+        // Authorizer.  Return UNKNOWN_TOPIC_ID.
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+    if (!hasClusterAuth) {
+      val allTopicNames = idToName.values().asScala ++ unknownTopicNameErrors.keySet().asScala
+      val authorizedDescribeTopics = getDescribableTopics(allTopicNames)
+      val authorizedDeleteTopics = getDeletableTopics(allTopicNames)
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val topicName = entry.getValue
+        if (!authorizedDeleteTopics.contains(topicName)) {
+          // Case 1 or case 4: the topic exists, but we don't have permission to delete it.
+          val topicId = entry.getKey
+          if (topicIdsToResolve.contains(topicId)) {
+            appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED))

Review comment:
       Could you add comment to explain why `name` must be null for this case?

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +163,162 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+    val unknownTopicNameErrors = new util.HashMap[String, ApiError]
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.forKeyValue {
+      case (name, idOrError) => if (idOrError.isError) {
+        unknownTopicNameErrors.put(name, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+
+    /**
+     * There are 5 error cases to handle here:
+     *
+     * 1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION
+     * 3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED
+     * 4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID
+     */
+    controller.findTopicNames(topicIdsToResolve).get().asScala.forKeyValue {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        // Case 5: we can't resolve the given topic ID to a name to feed to the
+        // Authorizer.  Return UNKNOWN_TOPIC_ID.
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+    if (!hasClusterAuth) {
+      val allTopicNames = idToName.values().asScala ++ unknownTopicNameErrors.keySet().asScala
+      val authorizedDescribeTopics = getDescribableTopics(allTopicNames)
+      val authorizedDeleteTopics = getDeletableTopics(allTopicNames)
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val topicName = entry.getValue
+        if (!authorizedDeleteTopics.contains(topicName)) {
+          // Case 1 or case 4: the topic exists, but we don't have permission to delete it.
+          val topicId = entry.getKey
+          if (topicIdsToResolve.contains(topicId)) {
+            appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          } else {
+            appendResponse(topicName, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          }
+          iterator.remove()
+        }
+      }
+      unknownTopicNameErrors.asScala.forKeyValue {
+        case (topicName, error) =>
+          if (authorizedDescribeTopics.contains(topicName)) {
+            // Case 2: the topic we tried to delete by name doesn't exist, and we have
+            // permission to know that.
+            appendResponse(topicName, ZERO_UUID, error)

Review comment:
       this method should be called again if `hasClusterAuth` is true.

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +163,162 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+    val unknownTopicNameErrors = new util.HashMap[String, ApiError]
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.forKeyValue {
+      case (name, idOrError) => if (idOrError.isError) {
+        unknownTopicNameErrors.put(name, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+
+    /**
+     * There are 5 error cases to handle here:
+     *
+     * 1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION
+     * 3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED
+     * 4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID
+     */
+    controller.findTopicNames(topicIdsToResolve).get().asScala.forKeyValue {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        // Case 5: we can't resolve the given topic ID to a name to feed to the
+        // Authorizer.  Return UNKNOWN_TOPIC_ID.
+        appendResponse(null, id, nameOrError.error())

Review comment:
       Should it pass `UNKNOWN_TOPIC_ID` for this case (according to comment)?

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +163,162 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+    val unknownTopicNameErrors = new util.HashMap[String, ApiError]
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.forKeyValue {
+      case (name, idOrError) => if (idOrError.isError) {
+        unknownTopicNameErrors.put(name, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+
+    /**
+     * There are 5 error cases to handle here:
+     *
+     * 1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION
+     * 3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED
+     * 4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID
+     */
+    controller.findTopicNames(topicIdsToResolve).get().asScala.forKeyValue {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        // Case 5: we can't resolve the given topic ID to a name to feed to the
+        // Authorizer.  Return UNKNOWN_TOPIC_ID.
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+    if (!hasClusterAuth) {
+      val allTopicNames = idToName.values().asScala ++ unknownTopicNameErrors.keySet().asScala
+      val authorizedDescribeTopics = getDescribableTopics(allTopicNames)

Review comment:
       How about creating this variable only if `unknownTopicNameErrors` is not empty.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r581058347



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+    val nameToId = new mutable.HashMap[String, Uuid]
+    deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+      name => nameToId.put(name, Uuid.ZERO_UUID)
+    }
+    deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+      nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+    }
+    val (describable, deletable)  = {
+      if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) {
+        (nameToId.keySet, nameToId.keySet)
+      } else {
+        val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized(
+          request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n)
+        val authorizedDeleteTopics: Set[String] = authHelper.filterByAuthorized(
+          request.context, DELETE, TOPIC, nameToId.keys)(n => n)
+        (authorizedDescribeTopics, authorizedDeleteTopics)
+      }
+    }
+    def sendResponse(response: DeleteTopicsResponseData): Unit = {
+      nameToId.keysIterator.foreach {
+        name => if (!deletable.contains(name)) {
+          val result = if (describable.contains(name)) {
+            new DeletableTopicResult().setName(name).setErrorCode(TOPIC_AUTHORIZATION_FAILED.code)
+          } else {
+            new DeletableTopicResult().setName(name).setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code)
+          }
+          response.responses().add(result)
+        }
+      }
+      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+        response.setThrottleTimeMs(throttleTimeMs)
+        new DeleteTopicsResponse(response)
+      })
+    }
+   val future = controller.deleteTopics(
+     nameToId.view.filterKeys(deletable.contains(_)).toMap.asJava)

Review comment:
       It can be simplified to `nameToId.view.filterKeys(deletable.contains).toMap.asJava)`

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+    val nameToId = new mutable.HashMap[String, Uuid]
+    deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+      name => nameToId.put(name, Uuid.ZERO_UUID)
+    }
+    deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+      nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+    }
+    val (describable, deletable)  = {
+      if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) {
+        (nameToId.keySet, nameToId.keySet)
+      } else {
+        val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized(

Review comment:
       Is it possible the request carries only topic ids? If yes, how to find the related topic names and then authorize them here?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +559,41 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors,
         return configChanges;
     }
 
+    ControllerResult<DeleteTopicsResponseData> deleteTopics(Map<String, Uuid> nameToId) {
+        DeleteTopicsResponseData result = new DeleteTopicsResponseData();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Entry<String, Uuid> entry : nameToId.entrySet()) {
+            ApiError error = deleteTopic(entry.getKey(), entry.getValue(), records);
+            result.responses().add(new DeletableTopicResult().
+                setName(entry.getKey()).
+                setTopicId(entry.getValue()).
+                setErrorCode(error.error().code()).
+                setErrorMessage(error.message()));
+        }
+        return new ControllerResult<>(records, result);
+    }
+
+    ApiError deleteTopic(String name,
+                         Uuid providedId,
+                         List<ApiMessageAndVersion> records) {
+        Uuid realId = topicsByName.get(name);
+        if (realId == null) {
+            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
+                "Unable to locate the provided topic name.");
+        }
+        if (!providedId.equals(Uuid.ZERO_UUID) && !providedId.equals(realId)) {
+            return new ApiError(UNKNOWN_TOPIC_ID,
+                "The provided topic ID does not match the provided topic name.");
+        }
+        TopicControlInfo topic = topics.get(realId);
+        if (topic == null) {
+            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "Unable to locate topic id.");

Review comment:
       Is this a server-side bug? the topic id exists in `topicsByName` but there is no `TopicControlInfo`.

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")

Review comment:
       How about using `throw Errors.INVALID_REQUEST.exception()`? That makes sure the exception is correlated to expected `Errors`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583084940



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +201,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve(_))
+
+    request.topics().iterator().asScala.foreach {
+      case topic => if (topic.name() == null) {

Review comment:
       ok
   
   - [ ] 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582255681



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+    val nameToId = new mutable.HashMap[String, Uuid]
+    deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+      name => nameToId.put(name, Uuid.ZERO_UUID)
+    }
+    deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+      nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+    }
+    val (describable, deletable)  = {
+      if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) {
+        (nameToId.keySet, nameToId.keySet)
+      } else {
+        val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized(
+          request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n)
+        val authorizedDeleteTopics: Set[String] = authHelper.filterByAuthorized(
+          request.context, DELETE, TOPIC, nameToId.keys)(n => n)
+        (authorizedDescribeTopics, authorizedDeleteTopics)
+      }
+    }
+    def sendResponse(response: DeleteTopicsResponseData): Unit = {
+      nameToId.keysIterator.foreach {
+        name => if (!deletable.contains(name)) {
+          val result = if (describable.contains(name)) {
+            new DeletableTopicResult().setName(name).setErrorCode(TOPIC_AUTHORIZATION_FAILED.code)
+          } else {
+            new DeletableTopicResult().setName(name).setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code)
+          }
+          response.responses().add(result)
+        }
+      }
+      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+        response.setThrottleTimeMs(throttleTimeMs)
+        new DeleteTopicsResponse(response)
+      })
+    }
+   val future = controller.deleteTopics(
+     nameToId.view.filterKeys(deletable.contains(_)).toMap.asJava)

Review comment:
       this has changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586632247



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +575,64 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors,
         return configChanges;
     }
 
+    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>();
+        for (String name : names) {
+            if (name == null) {
+                results.put(null, new ResultOrError<>(INVALID_REQUEST, "Invalid null topic name."));
+            } else {
+                Uuid id = topicsByName.get(name, offset);
+                if (id == null) {
+                    results.put(name, new ResultOrError<>(
+                        new ApiError(UNKNOWN_TOPIC_OR_PARTITION)));
+                } else {
+                    results.put(name, new ResultOrError<>(id));
+                }
+            }
+        }
+        return results;
+    }
+
+    Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) {
+        Map<Uuid, ResultOrError<String>> results = new HashMap<>();
+        for (Uuid id : ids) {
+            TopicControlInfo topic = topics.get(id, offset);
+            if (topic == null) {
+                results.put(id, new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID)));
+            } else {
+                results.put(id, new ResultOrError<>(topic.name));
+            }
+        }
+        return results;
+    }
+
+    ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+        Map<Uuid, ApiError> results = new HashMap<>();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Uuid id : ids) {
+            try {
+                deleteTopic(id, records);
+                results.put(id, ApiError.NONE);
+            } catch (ApiException e) {
+                results.put(id, ApiError.fromThrowable(e));
+            } catch (Exception e) {
+                log.error("Unexpected deleteTopics error for {}", id, e);
+                results.put(id, ApiError.fromThrowable(e));
+            }
+        }
+        return new ControllerResult<>(records, results);
+    }
+
+    void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {
+        TopicControlInfo topic = topics.get(id);
+        if (topic == null) {
+            throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message());
+        }
+        configurationControl.deleteTopicConfigs(topic.name);

Review comment:
       Why do we need to do this both here and in `replay`?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +575,64 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors,
         return configChanges;
     }
 
+    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>();

Review comment:
       nit: I guess we could initialize the size (a couple similar cases below as well)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583965997



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+      case (name, idOrError) => if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+    controller.findTopicNames(topicIdsToResolve).get().asScala.foreach {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+
+    if (!hasClusterAuth) {
+      val authorizedDescribeTopics = getDescribableTopics(idToName.values().asScala)
+      val authorizedDeleteTopics = getDeletableTopics(idToName.values().asScala)
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val topicName = entry.getValue
+        if (!authorizedDeleteTopics.contains(topicName)) {
+          val topicId = entry.getKey
+          if (!authorizedDescribeTopics.contains(topicName)) {
+            // If the user is not authorized to describe the topic, we pretend that it

Review comment:
       I think there are 5 cases:
   1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
   2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION
   3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED
   4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
   5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583077539



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +201,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve(_))
+
+    request.topics().iterator().asScala.foreach {
+      case topic => if (topic.name() == null) {

Review comment:
       The KIP specifies that the string will be null if the topic ID is intended to be used instead. Setting the string to empty is not valid.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10184:
URL: https://github.com/apache/kafka/pull/10184


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582259112



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+    val nameToId = new mutable.HashMap[String, Uuid]
+    deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+      name => nameToId.put(name, Uuid.ZERO_UUID)
+    }
+    deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+      nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+    }
+    val (describable, deletable)  = {
+      if (authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME)) {
+        (nameToId.keySet, nameToId.keySet)
+      } else {
+        val authorizedDescribeTopics: Set[String] = authHelper.filterByAuthorized(
+          request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n)
+        val authorizedDeleteTopics: Set[String] = authHelper.filterByAuthorized(
+          request.context, DELETE, TOPIC, nameToId.keys)(n => n)
+        (authorizedDescribeTopics, authorizedDeleteTopics)
+      }
+    }
+    def sendResponse(response: DeleteTopicsResponseData): Unit = {
+      nameToId.keysIterator.foreach {

Review comment:
       the main rationale for this is that I don't want scala to perform any copying, and I know for sure that if I give an iterator that it won't.
   
   I wouldn't mind using Java's forEach directly but I couldn't find a good way to convert a scala closure to a java closure (maybe this is obvious and I missed it?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r584139776



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +163,162 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+    val unknownTopicNameErrors = new util.HashMap[String, ApiError]
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.forKeyValue {
+      case (name, idOrError) => if (idOrError.isError) {
+        unknownTopicNameErrors.put(name, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+
+    /**
+     * There are 5 error cases to handle here:
+     *
+     * 1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION
+     * 3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED
+     * 4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
+     * 5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID
+     */
+    controller.findTopicNames(topicIdsToResolve).get().asScala.forKeyValue {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        // Case 5: we can't resolve the given topic ID to a name to feed to the
+        // Authorizer.  Return UNKNOWN_TOPIC_ID.
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+    if (!hasClusterAuth) {
+      val allTopicNames = idToName.values().asScala ++ unknownTopicNameErrors.keySet().asScala
+      val authorizedDescribeTopics = getDescribableTopics(allTopicNames)
+      val authorizedDeleteTopics = getDeletableTopics(allTopicNames)
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val topicName = entry.getValue
+        if (!authorizedDeleteTopics.contains(topicName)) {
+          // Case 1 or case 4: the topic exists, but we don't have permission to delete it.
+          val topicId = entry.getKey
+          if (topicIdsToResolve.contains(topicId)) {
+            appendResponse(null, topicId, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          } else {
+            appendResponse(topicName, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          }
+          iterator.remove()
+        }
+      }
+      unknownTopicNameErrors.asScala.forKeyValue {
+        case (topicName, error) =>
+          if (authorizedDescribeTopics.contains(topicName)) {
+            // Case 2: the topic we tried to delete by name doesn't exist, and we have
+            // permission to know that.
+            appendResponse(topicName, ZERO_UUID, error)

Review comment:
       this method should be called if `hasClusterAuth` is true.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583900849



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {

Review comment:
       Unfortunately, that will not work since we have to shuffle it at the end




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-785354588


   rebased on trunk


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583940677



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,

Review comment:
       It is kind of frustrating that there is this much complexity in the "apis" class.  At least there is a good unit test for it now, though.  I hope that most other APIs won't be this complex.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r585942730



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {
+        TopicControlInfo topic = topics.remove(record.topicId());
+        if (topic == null) {
+            throw new RuntimeException("Can't find topic with ID " + record.topicId() +
+                " to remove.");
+        }
+        topicsByName.remove(topic.name);

Review comment:
       Sorry, you're right: we need to remove this from `brokersToIsrs`.  Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583903614



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {

Review comment:
       Good catch.  Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r581120055



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")

Review comment:
       The error message would be less useful in that case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583084592



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +559,57 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors,
         return configChanges;
     }
 
+    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>();
+        for (String name : names) {
+            if (name == null) {
+                results.put(null, new ResultOrError<>(
+                    new ApiError(INVALID_REQUEST, "Invalid null topic name.")));
+            } else {
+                Uuid id = topicsByName.get(name, offset);
+                if (id == null) {
+                    results.put(null, new ResultOrError<>(

Review comment:
       Good catch. I fixed this and added a unit test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r587072161



##########
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##########
@@ -142,6 +148,199 @@ class ControllerApisTest {
       brokerRegistrationResponse.errorCounts().asScala)
   }
 
+  @Test
+  def testDeleteTopicsByName(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val controller = new MockController.Builder().newInitialTopic("foo", fooId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData().setTopicNames(
+      util.Arrays.asList("foo", "bar", "quux", "quux"))
+    val expectedResponse = Set(new DeletableTopicResult().setName("quux").
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic name."),
+      new DeletableTopicResult().setName("bar").
+        setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).
+        setErrorMessage("This server does not host this topic-partition."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId))
+    assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      true,
+      _ => Set.empty,
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testDeleteTopicsById(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val quuxId = Uuid.fromString("ObXkLhL_S5W62FAE67U3MQ")
+    val controller = new MockController.Builder().newInitialTopic("foo", fooId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId))
+    val response = Set(new DeletableTopicResult().setName(null).setTopicId(quuxId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic id."),
+      new DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.UNKNOWN_TOPIC_ID.code()).
+        setErrorMessage("This server does not host this topic ID."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      true,
+      _ => Set.empty,
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testInvalidDeleteTopicsRequest(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val bazId = Uuid.fromString("YOS4oQ3UT9eSAZahN1ysSA")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).
+      newInitialTopic("bar", barId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("foo").setTopicId(fooId))
+    request.topics().add(new DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+    val response = Set(new DeletableTopicResult().setName(null).setTopicId(ZERO_UUID).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Neither topic name nor id were specified."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("You may not specify both topic name and topic id."),
+      new DeletableTopicResult().setName("bar").setTopicId(barId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("The provided topic name maps to an ID that was already supplied."),
+      new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic name."),
+      new DeletableTopicResult().setName(null).setTopicId(bazId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic id."))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      names => names.toSet,
+      names => names.toSet).asScala.toSet)
+  }
+
+  @Test
+  def testNotAuthorizedToDeleteWithTopicExisting(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val bazId = Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")
+    val quuxId = Uuid.fromString("5URoQzW_RJiERVZXJgUVLg")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).
+      newInitialTopic("bar", barId).
+      newInitialTopic("baz", bazId).
+      newInitialTopic("quux", quuxId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new DeleteTopicState().setName("baz").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    val response = Set(new DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("baz").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      _ => Set("foo", "baz"),
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testNotAuthorizedToDeleteWithTopicNotExisting(): Unit = {
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val controller = new MockController.Builder().build()
+    val props = new Properties()
+    props.put(KafkaConfig.DeleteTopicEnableProp, "false")
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    val expectedResponse = Set(new DeletableTopicResult().setName("foo").
+        setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code).
+        setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message),
+      new DeletableTopicResult().setName("bar").
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.UNKNOWN_TOPIC_ID.code).
+        setErrorMessage(Errors.UNKNOWN_TOPIC_ID.message))
+    assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      _ => Set("foo"),
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testNotControllerErrorPreventsDeletingTopics(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).build()
+    controller.setActive(false)

Review comment:
       this controller is mock so disabling active works well for this test. However, I did not observe the check of control activity in production code. Could you share that with me?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +574,63 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors,
         return configChanges;
     }
 
+    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>(names.size());
+        for (String name : names) {
+            if (name == null) {
+                results.put(null, new ResultOrError<>(INVALID_REQUEST, "Invalid null topic name."));
+            } else {
+                Uuid id = topicsByName.get(name, offset);
+                if (id == null) {
+                    results.put(name, new ResultOrError<>(
+                        new ApiError(UNKNOWN_TOPIC_OR_PARTITION)));
+                } else {
+                    results.put(name, new ResultOrError<>(id));
+                }
+            }
+        }
+        return results;
+    }
+
+    Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) {
+        Map<Uuid, ResultOrError<String>> results = new HashMap<>(ids.size());
+        for (Uuid id : ids) {

Review comment:
       Should it check `ZERO_UUID` (use `INVALID_REQUEST` instead of `UNKNOWN_TOPIC_ID`)?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +359,29 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {
+        // Remove this topic from the topics map and the topicsByName map.
+        TopicControlInfo topic = topics.remove(record.topicId());
+        if (topic == null) {
+            throw new RuntimeException("Can't find topic with ID " + record.topicId() +

Review comment:
       Is `UnknownTopicIdException` more suitable?

##########
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##########
@@ -142,6 +148,199 @@ class ControllerApisTest {
       brokerRegistrationResponse.errorCounts().asScala)
   }
 
+  @Test
+  def testDeleteTopicsByName(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val controller = new MockController.Builder().newInitialTopic("foo", fooId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData().setTopicNames(
+      util.Arrays.asList("foo", "bar", "quux", "quux"))
+    val expectedResponse = Set(new DeletableTopicResult().setName("quux").
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic name."),
+      new DeletableTopicResult().setName("bar").
+        setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).
+        setErrorMessage("This server does not host this topic-partition."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId))
+    assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      true,
+      _ => Set.empty,
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testDeleteTopicsById(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val quuxId = Uuid.fromString("ObXkLhL_S5W62FAE67U3MQ")
+    val controller = new MockController.Builder().newInitialTopic("foo", fooId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId))
+    val response = Set(new DeletableTopicResult().setName(null).setTopicId(quuxId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic id."),
+      new DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.UNKNOWN_TOPIC_ID.code()).
+        setErrorMessage("This server does not host this topic ID."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      true,
+      _ => Set.empty,
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testInvalidDeleteTopicsRequest(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val bazId = Uuid.fromString("YOS4oQ3UT9eSAZahN1ysSA")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).
+      newInitialTopic("bar", barId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("foo").setTopicId(fooId))
+    request.topics().add(new DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+    val response = Set(new DeletableTopicResult().setName(null).setTopicId(ZERO_UUID).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Neither topic name nor id were specified."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("You may not specify both topic name and topic id."),
+      new DeletableTopicResult().setName("bar").setTopicId(barId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("The provided topic name maps to an ID that was already supplied."),
+      new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic name."),
+      new DeletableTopicResult().setName(null).setTopicId(bazId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic id."))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      names => names.toSet,
+      names => names.toSet).asScala.toSet)
+  }
+
+  @Test
+  def testNotAuthorizedToDeleteWithTopicExisting(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val bazId = Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")
+    val quuxId = Uuid.fromString("5URoQzW_RJiERVZXJgUVLg")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).
+      newInitialTopic("bar", barId).
+      newInitialTopic("baz", bazId).
+      newInitialTopic("quux", quuxId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new DeleteTopicState().setName("baz").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    val response = Set(new DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("baz").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      _ => Set("foo", "baz"),
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testNotAuthorizedToDeleteWithTopicNotExisting(): Unit = {
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val controller = new MockController.Builder().build()
+    val props = new Properties()

Review comment:
       This `props` is not used in this test case.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -541,6 +574,63 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors,
         return configChanges;
     }
 
+    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>(names.size());
+        for (String name : names) {
+            if (name == null) {
+                results.put(null, new ResultOrError<>(INVALID_REQUEST, "Invalid null topic name."));
+            } else {
+                Uuid id = topicsByName.get(name, offset);
+                if (id == null) {
+                    results.put(name, new ResultOrError<>(
+                        new ApiError(UNKNOWN_TOPIC_OR_PARTITION)));
+                } else {
+                    results.put(name, new ResultOrError<>(id));
+                }
+            }
+        }
+        return results;
+    }
+
+    Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) {
+        Map<Uuid, ResultOrError<String>> results = new HashMap<>(ids.size());
+        for (Uuid id : ids) {
+            TopicControlInfo topic = topics.get(id, offset);
+            if (topic == null) {
+                results.put(id, new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID)));
+            } else {
+                results.put(id, new ResultOrError<>(topic.name));
+            }
+        }
+        return results;
+    }
+
+    ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+        Map<Uuid, ApiError> results = new HashMap<>(ids.size());
+        List<ApiMessageAndVersion> records = new ArrayList<>();

Review comment:
       How about setting initial size of `records`? `new ArrayList<>(ids.size())`

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -72,6 +76,9 @@
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
 import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;

Review comment:
       At this static member is imported, we can replace all `Errors.INVALID_REQUEST` by `INVALID_REQUEST` in this class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-789254543


   @rondagostino I believe that error will be fixed by https://issues.apache.org/jira/browse/KAFKA-12403.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582256432



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    if (!config.deleteTopicEnable) {
+      if (request.context.apiVersion() < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")

Review comment:
       As @ijuma said, in this case the error message gives extra information which would be very helpful to users. If we always wanted the same message, there would be no reason to have the string field in the wire protocol.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583084809



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -195,6 +201,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve(_))

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586880128



##########
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##########
@@ -18,19 +18,26 @@
 package unit.kafka.server

Review comment:
       This is an existing issue, but we don't need unit in package name.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +359,29 @@ public void replay(PartitionChangeRecord record) {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {
+        // Remove this topic from the topics map and the topicsByName map.
+        TopicControlInfo topic = topics.remove(record.topicId());
+        if (topic == null) {
+            throw new RuntimeException("Can't find topic with ID " + record.topicId() +
+                " to remove.");
+        }
+        topicsByName.remove(topic.name);
+
+        // Delete the configurations associated with this topic.
+        configurationControl.deleteTopicConfigs(topic.name);
+
+        // Remove the entries for this topic in brokersToIsrs.
+        for (PartitionControlInfo partition : topic.parts.values()) {
+            for (int i = 0; i < partition.isr.length; i++) {
+                brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
+            }
+        }
+        brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);

Review comment:
       Hmm, why do we need to remove for -1 broker? It doesn't seem that brokersToIsrs tracks that.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org