You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/30 19:42:11 UTC

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12850: KAFKA-14367; Add `LeaveGroup` to the new `GroupCoordinator` interface

jeffkbkim commented on code in PR #12850:
URL: https://github.com/apache/kafka/pull/12850#discussion_r1036368544


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2921,57 +2921,188 @@ class KafkaApisTest {
     assertEquals(expectedTopicErrors, response.data.topics())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+  def testHandleLeaveGroupWithMultipleGroups(version: Short): Unit = {

Review Comment:
   should this be MultipleMembers or BatchedMembers?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2921,57 +2921,188 @@ class KafkaApisTest {
     assertEquals(expectedTopicErrors, response.data.topics())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+  def testHandleLeaveGroupWithMultipleGroups(version: Short): Unit = {
+    def makeRequest(version: Short): RequestChannel.Request = {
+      buildRequest(new LeaveGroupRequest.Builder(
+        "group",
+        List(
+          new MemberIdentity()
+            .setMemberId("member-1")
+            .setGroupInstanceId("instance-1"),
+          new MemberIdentity()
+            .setMemberId("member-2")
+            .setGroupInstanceId("instance-2")
+        ).asJava
+      ).build(version))
+    }
+
+    if (version < 3) {
+      // Request version earlier than version 3 do not support batching members.
+      assertThrows(classOf[UnsupportedVersionException], () => makeRequest(version))
+    } else {
+      val requestChannelRequest = makeRequest(version)
+
+      val expectedLeaveGroupRequest = new LeaveGroupRequestData()
+        .setGroupId("group")
+        .setMembers(List(
+          new MemberIdentity()
+            .setMemberId("member-1")
+            .setGroupInstanceId("instance-1"),
+          new MemberIdentity()
+            .setMemberId("member-2")
+            .setGroupInstanceId("instance-2")
+        ).asJava)
+
+      val future = new CompletableFuture[LeaveGroupResponseData]()
+      when(newGroupCoordinator.leaveGroup(
+        requestChannelRequest.context,
+        expectedLeaveGroupRequest
+      )).thenReturn(future)
+
+      createKafkaApis().handleLeaveGroupRequest(requestChannelRequest)
+
+      val expectedLeaveResponse = new LeaveGroupResponseData()
+        .setErrorCode(Errors.NONE.code)
+        .setMembers(List(
+          new LeaveGroupResponseData.MemberResponse()
+            .setMemberId("member-1")
+            .setGroupInstanceId("instance-1"),
+          new LeaveGroupResponseData.MemberResponse()
+            .setMemberId("member-2")
+            .setGroupInstanceId("instance-2"),
+        ).asJava)
+
+      future.complete(expectedLeaveResponse)
+      val capturedResponse = verifyNoThrottling(requestChannelRequest)
+      val response = capturedResponse.getValue.asInstanceOf[LeaveGroupResponse]
+      assertEquals(expectedLeaveResponse, response.data)
+    }
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+  def testHandleLeaveGroupWithSingleGroup(version: Short): Unit = {

Review Comment:
   maybe SingleMember?



##########
clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java:
##########
@@ -55,6 +56,22 @@ public LeaveGroupResponse(LeaveGroupResponseData data) {
         this.data = data;
     }
 
+    public LeaveGroupResponse(LeaveGroupResponseData data, short version) {
+        super(ApiKeys.LEAVE_GROUP);
+
+        if (version >= 3) {
+            this.data = data;
+        } else {
+            if (data.members().size() != 1) {
+                throw new UnsupportedVersionException("LeaveGroup response version " + version +
+                    " can only contain one member, got " + data.members().size() + " members.");
+            }
+
+            this.data = new LeaveGroupResponseData()

Review Comment:
   it might be more readable if we use a `topLevelError` variable as in L82, the line is a bit hard to understand



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1803,41 +1802,32 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleLeaveGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleLeaveGroupRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val leaveGroupRequest = request.body[LeaveGroupRequest]
 
-    val members = leaveGroupRequest.members.asScala.toList
-
-    if (!authHelper.authorize(request.context, READ, GROUP, leaveGroupRequest.data.groupId)) {
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending leave group response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
       requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-        new LeaveGroupResponse(new LeaveGroupResponseData()
-          .setThrottleTimeMs(requestThrottleMs)
-          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
-        )
+        response.maybeSetThrottleTimeMs(requestThrottleMs)

Review Comment:
   note to refactor once https://github.com/apache/kafka/pull/12847 is merged



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org