You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/11 23:54:55 UTC

[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

CalvinConfluent commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1067573213


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3110,61 +3110,69 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleOffsetDeleteRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
+  def handleOffsetDeleteRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetDeleteRequest = request.body[OffsetDeleteRequest]
-    val groupId = offsetDeleteRequest.data.groupId
 
-    if (authHelper.authorize(request.context, DELETE, GROUP, groupId)) {
-      val topics = offsetDeleteRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
+    if (!authHelper.authorize(request.context, DELETE, GROUP, offsetDeleteRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, offsetDeleteRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetDeleteRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new OffsetDeleteResponse.Builder
+      val authorizedTopicPartitions = new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection()
+      offsetDeleteRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName(topic.name)
 
-      val topicPartitionErrors = mutable.Map[TopicPartition, Errors]()
-      val topicPartitions = mutable.ArrayBuffer[TopicPartition]()
+          topic.partitions.forEach { partition =>

Review Comment:
   In the old code, we seem not to check the existence of all the partitions. Is it an extra safety check here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org