You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/08 18:42:47 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

hachikuji commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043700608


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
+    val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(

Review Comment:
   I wonder if it would be simpler to pass through all groups that we want to describe. Then we just have a single future.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
+    val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(
+          request.context,
+          groupId
+        ).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) =>
+          if (exception != null) {
+            DescribeGroupsResponse.forError(groupId, Errors.forException(exception))

Review Comment:
   A little outside the scope of this patch, but I think we can improve the name here. The class `DescribeGroupsResponse` has both `forError` and `fromError` and it's not really clear what the differences are from the naming. Since this is a group-level error, perhaps we can emphasize that in the name. Maybe we could call it `groupError` or something like that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org