You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/15 13:42:39 UTC

[GitHub] [kafka] dajac opened a new pull request, #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

dajac opened a new pull request, #12855:
URL: https://github.com/apache/kafka/pull/12855

   This patch adds `describeGroup` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1045499176


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2061,32 +2072,58 @@ class KafkaApisTest {
 
   @Test
   def testHandleDescribeGroupsAuthenticationFailed(): Unit = {
-    val describeGroupsRequest = new DescribeGroupsRequestData()
-      .setGroups(List(
-        "group-1",
-        "group-2",
-        "group-3"
-      ).asJava)
+    val describeGroupsRequest = new DescribeGroupsRequestData().setGroups(List(
+      "group-1",

Review Comment:
   yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1035413661


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1567,53 +1567,54 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): Unit = {
+    val describeRequest = request.body[DescribeGroupsRequest]
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
 
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
-    val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
-
+    val futures = mutable.ArrayBuffer.empty[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        val ctx = makeGroupCoordinatorRequestContextFrom(request, RequestLocal.NoCaching)
+        futures += newGroupCoordinator.describeGroup(ctx, groupId).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) =>
+          if (exception != null) {

Review Comment:
   Not a huge deal but was wondering how you decided what to include in the newGroupCoordinator method vs what to keep outside of it. The below lines about exception being null could be included right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043725962


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
+    val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(

Review Comment:
   There is not particular reason, I guess. @hachikuji The reason I kept it like this is that we will need to fan out any way with the new controller as we will have multiple event loops. I suppose that we could say that this is an internal implementation details so not exposing it in the interface would make sense. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043727578


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
+    val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(
+          request.context,
+          groupId
+        ).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) =>
+          if (exception != null) {
+            DescribeGroupsResponse.forError(groupId, Errors.forException(exception))

Review Comment:
   Sure. Let me clean this. I wonder if we should also remove `fromError` and push that code to `DescribeGroupsRequest.getErrorResponse` in order to be consistent with the other requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac merged PR #12855:
URL: https://github.com/apache/kafka/pull/12855


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1036275660


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1567,53 +1567,54 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): Unit = {
+    val describeRequest = request.body[DescribeGroupsRequest]
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
 
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {

Review Comment:
   Did we want to refactor to include the new SendResponse method from the SyncGroup PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043700608


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
+    val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(

Review Comment:
   I wonder if it would be simpler to pass through all groups that we want to describe. Then we just have a single future.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
+    val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(
+          request.context,
+          groupId
+        ).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) =>
+          if (exception != null) {
+            DescribeGroupsResponse.forError(groupId, Errors.forException(exception))

Review Comment:
   A little outside the scope of this patch, but I think we can improve the name here. The class `DescribeGroupsResponse` has both `forError` and `fromError` and it's not really clear what the differences are from the naming. Since this is a group-level error, perhaps we can emphasize that in the name. Maybe we could call it `groupError` or something like that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043769009


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
+    val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(
+          request.context,
+          groupId
+        ).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) =>
+          if (exception != null) {
+            DescribeGroupsResponse.forError(groupId, Errors.forException(exception))

Review Comment:
   Yeah, I was wondering why we didn't use `getErrorResponse`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1035407440


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -190,7 +190,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
         case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
         case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal)
-        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
+        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupsRequest(request)

Review Comment:
   I see we changed this to be plural -- was the reasoning because of the API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043708573


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
+    val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(

Review Comment:
   Any idea why the original semantics were to handle each group one by one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1045499820


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -80,8 +80,8 @@ import java.util
 import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Collections, Optional}
-

Review Comment:
   reverted this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1035408521


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1567,53 +1567,54 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): Unit = {
+    val describeRequest = request.body[DescribeGroupsRequest]
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
 
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {

Review Comment:
   did we want a trace log here too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1035766224


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1567,53 +1567,54 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): Unit = {
+    val describeRequest = request.body[DescribeGroupsRequest]
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
 
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
-    val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
-
+    val futures = mutable.ArrayBuffer.empty[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        val ctx = makeGroupCoordinatorRequestContextFrom(request, RequestLocal.NoCaching)
+        futures += newGroupCoordinator.describeGroup(ctx, groupId).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) =>
+          if (exception != null) {
+            DescribeGroupsResponse.forError(
+              groupId,
+              Errors.forException(exception)
+            )
+          } else {
+            if (request.header.apiVersion >= 3 &&

Review Comment:
   let me put it on one line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1036297668


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1567,53 +1567,54 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): Unit = {
+    val describeRequest = request.body[DescribeGroupsRequest]
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
 
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {

Review Comment:
   Yes, will do when that PR is merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1044444236


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
+    val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(
+          request.context,
+          groupId
+        ).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) =>
+          if (exception != null) {
+            DescribeGroupsResponse.forError(groupId, Errors.forException(exception))

Review Comment:
   https://github.com/apache/kafka/pull/12970 was merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1044755787


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -190,25 +191,29 @@ class GroupCoordinatorAdapter(
 
   override def describeGroup(
     context: RequestContext,
-    groupId: String
-  ): CompletableFuture[DescribeGroupsResponseData.DescribedGroup] = {
-    val (error, summary) = coordinator.handleDescribeGroup(groupId)
+    groupIds: util.List[String]

Review Comment:
   do we want to rename this to describeGroups since we have multiple group IDs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1044754795


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -80,8 +80,8 @@ import java.util
 import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Collections, Optional}
-

Review Comment:
   nit: line here -- not sure what the expected format is though 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1044762017


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2061,32 +2072,58 @@ class KafkaApisTest {
 
   @Test
   def testHandleDescribeGroupsAuthenticationFailed(): Unit = {
-    val describeGroupsRequest = new DescribeGroupsRequestData()
-      .setGroups(List(
-        "group-1",
-        "group-2",
-        "group-3"
-      ).asJava)
+    val describeGroupsRequest = new DescribeGroupsRequestData().setGroups(List(
+      "group-1",
+      "group-2",
+      "group-3"
+    ).asJava)
 
     val requestChannelRequest = buildRequest(new DescribeGroupsRequest.Builder(describeGroupsRequest).build())
 
     val authorizer: Authorizer = mock(classOf[Authorizer])
-    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
-      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+    val acls = Map(
+      "group-1" -> AuthorizationResult.DENIED,
+      "group-2" -> AuthorizationResult.ALLOWED,
+      "group-3" -> AuthorizationResult.DENIED
+    )
+
+    when(authorizer.authorize(
+      any[RequestContext],
+      any[util.List[Action]]
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument(1, classOf[util.List[Action]])
+      actions.asScala.map { action =>
+        acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+      }.asJava
+    }
+
+    val future = new CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]]()
+    when(newGroupCoordinator.describeGroup(
+      requestChannelRequest.context,
+      List("group-2").asJava
+    )).thenReturn(future)
 
     createKafkaApis(authorizer = Some(authorizer)).handleDescribeGroupsRequest(requestChannelRequest)
 
-    val expectedDescribeGroupsResponse = new DescribeGroupsResponseData()
-      .setGroups(List(
-        new DescribeGroupsResponseData.DescribedGroup()
-          .setGroupId("group-1")
-          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code),
-        new DescribeGroupsResponseData.DescribedGroup()
-          .setGroupId("group-2")
-          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code),
-        new DescribeGroupsResponseData.DescribedGroup()
-          .setGroupId("group-3")
-          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)).asJava)
+    future.complete(List(
+      new DescribeGroupsResponseData.DescribedGroup()
+        .setGroupId("group-2")
+        .setErrorCode(Errors.NOT_COORDINATOR.code)
+    ).asJava)
+
+    val expectedDescribeGroupsResponse = new DescribeGroupsResponseData().setGroups(List(
+      // group-1 and group-3 are first because unauthorized are put first into the response.

Review Comment:
   I think this is a good addition to test. 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1044761059


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2061,32 +2072,58 @@ class KafkaApisTest {
 
   @Test
   def testHandleDescribeGroupsAuthenticationFailed(): Unit = {
-    val describeGroupsRequest = new DescribeGroupsRequestData()
-      .setGroups(List(
-        "group-1",
-        "group-2",
-        "group-3"
-      ).asJava)
+    val describeGroupsRequest = new DescribeGroupsRequestData().setGroups(List(
+      "group-1",

Review Comment:
   was this change intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1035764209


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -190,7 +190,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
         case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
         case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal)
-        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
+        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupsRequest(request)

Review Comment:
   That's right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1036272823


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1567,53 +1567,54 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): Unit = {
+    val describeRequest = request.body[DescribeGroupsRequest]
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
 
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
-    val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
-
+    val futures = mutable.ArrayBuffer.empty[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        val ctx = makeGroupCoordinatorRequestContextFrom(request, RequestLocal.NoCaching)
+        futures += newGroupCoordinator.describeGroup(ctx, groupId).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) =>
+          if (exception != null) {

Review Comment:
   Ah I see. Thanks for clarifying



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1035767018


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1567,53 +1567,54 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): Unit = {
+    val describeRequest = request.body[DescribeGroupsRequest]
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
 
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {

Review Comment:
   added it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1035793556


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1567,53 +1567,54 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): Unit = {
+    val describeRequest = request.body[DescribeGroupsRequest]
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
 
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
-    val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
-
+    val futures = mutable.ArrayBuffer.empty[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        val ctx = makeGroupCoordinatorRequestContextFrom(request, RequestLocal.NoCaching)
+        futures += newGroupCoordinator.describeGroup(ctx, groupId).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) =>
+          if (exception != null) {

Review Comment:
   No, it could not because `newGroupCoordinator.describeGroup` could actually return a future containing an exception (expected or unexpected) so we have to handle this case here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1036275660


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1567,53 +1567,54 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): Unit = {
+    val describeRequest = request.body[DescribeGroupsRequest]
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
 
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {

Review Comment:
   Did we want to refactor to include the new SendResponse method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1035408371


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1567,53 +1567,54 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): Unit = {
+    val describeRequest = request.body[DescribeGroupsRequest]
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
 
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
-    val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
-
+    val futures = mutable.ArrayBuffer.empty[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        val ctx = makeGroupCoordinatorRequestContextFrom(request, RequestLocal.NoCaching)
+        futures += newGroupCoordinator.describeGroup(ctx, groupId).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) =>
+          if (exception != null) {
+            DescribeGroupsResponse.forError(
+              groupId,
+              Errors.forException(exception)
+            )
+          } else {
+            if (request.header.apiVersion >= 3 &&

Review Comment:
   nit: but does this need to be 3 lines?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043802283


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
+    val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(
+          request.context,
+          groupId
+        ).handle[DescribeGroupsResponseData.DescribedGroup] { (response, exception) =>
+          if (exception != null) {
+            DescribeGroupsResponse.forError(groupId, Errors.forException(exception))

Review Comment:
   I did the refactor separately: https://github.com/apache/kafka/pull/12970.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043778147


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
+    val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(

Review Comment:
   I don't have a strong opinion, but I slightly prefer keeping `KafkaApis` simple and having the complexity in the coordinator implementation. I also wonder if that makes optimization easier. For example, even if we have fanout to the respective coordinators, it may still be the case that multiple groups are handled by the same coordinator. Perhaps that would mean fewer messages on the respective queues? Not sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043782746


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
+    val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(

Review Comment:
   Yeah, I think that I am with you on this point. Keeping `KafkaApis` simple makes sense. Let me give it a shot tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12855: KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1044275943


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = describeRequest.data.includeAuthorizedOperations
+    val futures = new mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && describeRequest.data.includeAuthorizedOperations) {
-            describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(

Review Comment:
   I have refactored the code to address this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org