You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/14 16:09:03 UTC

[GitHub] [kafka] dajac opened a new pull request, #12850: KAFKA-14367; Add `LeaveGroup` to the new `GroupCoordinator` interface

dajac opened a new pull request, #12850:
URL: https://github.com/apache/kafka/pull/12850

   This patch adds `leaveGroup` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12850: KAFKA-14367; Add `LeaveGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12850:
URL: https://github.com/apache/kafka/pull/12850#discussion_r1035687745


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1805,38 +1804,24 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleLeaveGroupRequest(request: RequestChannel.Request): Unit = {
     val leaveGroupRequest = request.body[LeaveGroupRequest]
 
-    val members = leaveGroupRequest.members.asScala.toList
-
-    if (!authHelper.authorize(request.context, READ, GROUP, leaveGroupRequest.data.groupId)) {
+    def sendResponse(response: AbstractResponse): Unit = {
       requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-        new LeaveGroupResponse(new LeaveGroupResponseData()

Review Comment:
   Added it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12850: KAFKA-14367; Add `LeaveGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12850:
URL: https://github.com/apache/kafka/pull/12850#discussion_r1036368544


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2921,57 +2921,188 @@ class KafkaApisTest {
     assertEquals(expectedTopicErrors, response.data.topics())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+  def testHandleLeaveGroupWithMultipleGroups(version: Short): Unit = {

Review Comment:
   should this be MultipleMembers or BatchedMembers?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2921,57 +2921,188 @@ class KafkaApisTest {
     assertEquals(expectedTopicErrors, response.data.topics())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+  def testHandleLeaveGroupWithMultipleGroups(version: Short): Unit = {
+    def makeRequest(version: Short): RequestChannel.Request = {
+      buildRequest(new LeaveGroupRequest.Builder(
+        "group",
+        List(
+          new MemberIdentity()
+            .setMemberId("member-1")
+            .setGroupInstanceId("instance-1"),
+          new MemberIdentity()
+            .setMemberId("member-2")
+            .setGroupInstanceId("instance-2")
+        ).asJava
+      ).build(version))
+    }
+
+    if (version < 3) {
+      // Request version earlier than version 3 do not support batching members.
+      assertThrows(classOf[UnsupportedVersionException], () => makeRequest(version))
+    } else {
+      val requestChannelRequest = makeRequest(version)
+
+      val expectedLeaveGroupRequest = new LeaveGroupRequestData()
+        .setGroupId("group")
+        .setMembers(List(
+          new MemberIdentity()
+            .setMemberId("member-1")
+            .setGroupInstanceId("instance-1"),
+          new MemberIdentity()
+            .setMemberId("member-2")
+            .setGroupInstanceId("instance-2")
+        ).asJava)
+
+      val future = new CompletableFuture[LeaveGroupResponseData]()
+      when(newGroupCoordinator.leaveGroup(
+        requestChannelRequest.context,
+        expectedLeaveGroupRequest
+      )).thenReturn(future)
+
+      createKafkaApis().handleLeaveGroupRequest(requestChannelRequest)
+
+      val expectedLeaveResponse = new LeaveGroupResponseData()
+        .setErrorCode(Errors.NONE.code)
+        .setMembers(List(
+          new LeaveGroupResponseData.MemberResponse()
+            .setMemberId("member-1")
+            .setGroupInstanceId("instance-1"),
+          new LeaveGroupResponseData.MemberResponse()
+            .setMemberId("member-2")
+            .setGroupInstanceId("instance-2"),
+        ).asJava)
+
+      future.complete(expectedLeaveResponse)
+      val capturedResponse = verifyNoThrottling(requestChannelRequest)
+      val response = capturedResponse.getValue.asInstanceOf[LeaveGroupResponse]
+      assertEquals(expectedLeaveResponse, response.data)
+    }
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+  def testHandleLeaveGroupWithSingleGroup(version: Short): Unit = {

Review Comment:
   maybe SingleMember?



##########
clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java:
##########
@@ -55,6 +56,22 @@ public LeaveGroupResponse(LeaveGroupResponseData data) {
         this.data = data;
     }
 
+    public LeaveGroupResponse(LeaveGroupResponseData data, short version) {
+        super(ApiKeys.LEAVE_GROUP);
+
+        if (version >= 3) {
+            this.data = data;
+        } else {
+            if (data.members().size() != 1) {
+                throw new UnsupportedVersionException("LeaveGroup response version " + version +
+                    " can only contain one member, got " + data.members().size() + " members.");
+            }
+
+            this.data = new LeaveGroupResponseData()

Review Comment:
   it might be more readable if we use a `topLevelError` variable as in L82, the line is a bit hard to understand



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1803,41 +1802,32 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleLeaveGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleLeaveGroupRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val leaveGroupRequest = request.body[LeaveGroupRequest]
 
-    val members = leaveGroupRequest.members.asScala.toList
-
-    if (!authHelper.authorize(request.context, READ, GROUP, leaveGroupRequest.data.groupId)) {
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending leave group response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
       requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-        new LeaveGroupResponse(new LeaveGroupResponseData()
-          .setThrottleTimeMs(requestThrottleMs)
-          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
-        )
+        response.maybeSetThrottleTimeMs(requestThrottleMs)

Review Comment:
   note to refactor once https://github.com/apache/kafka/pull/12847 is merged



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12850: KAFKA-14367; Add `LeaveGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12850:
URL: https://github.com/apache/kafka/pull/12850#discussion_r1036352732


##########
clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java:
##########
@@ -55,6 +56,22 @@ public LeaveGroupResponse(LeaveGroupResponseData data) {
         this.data = data;
     }
 
+    public LeaveGroupResponse(LeaveGroupResponseData data, short version) {
+        super(ApiKeys.LEAVE_GROUP);
+
+        if (version >= 3) {
+            this.data = data;
+        } else {
+            if (data.members().size() != 1) {
+                throw new UnsupportedVersionException("LeaveGroup response version " + version +
+                    " can only contain one member, got " + data.members().size() + " members.");
+            }
+
+            this.data = new LeaveGroupResponseData()

Review Comment:
   Are you referring to
   `final short errorCode = getError(topLevelError, memberResponses).code();` ?
   I guess it is the same, but done in more lines. The arguments to getError are also a bit clearer.
   
   But maybe not a huge deal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12850: KAFKA-14367; Add `LeaveGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12850:
URL: https://github.com/apache/kafka/pull/12850#discussion_r1028565156


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1805,38 +1804,24 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleLeaveGroupRequest(request: RequestChannel.Request): Unit = {
     val leaveGroupRequest = request.body[LeaveGroupRequest]
 
-    val members = leaveGroupRequest.members.asScala.toList
-
-    if (!authHelper.authorize(request.context, READ, GROUP, leaveGroupRequest.data.groupId)) {
+    def sendResponse(response: AbstractResponse): Unit = {
       requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-        new LeaveGroupResponse(new LeaveGroupResponseData()

Review Comment:
   In some of the other PRs we had trace logs. Did we want to do that here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #12850: KAFKA-14367; Add `LeaveGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac merged PR #12850:
URL: https://github.com/apache/kafka/pull/12850


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12850: KAFKA-14367; Add `LeaveGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12850:
URL: https://github.com/apache/kafka/pull/12850#discussion_r1028558389


##########
clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java:
##########
@@ -55,6 +56,22 @@ public LeaveGroupResponse(LeaveGroupResponseData data) {
         this.data = data;
     }
 
+    public LeaveGroupResponse(LeaveGroupResponseData data, short version) {
+        super(ApiKeys.LEAVE_GROUP);
+
+        if (version >= 3) {
+            this.data = data;
+        } else {
+            if (data.members().size() != 1) {
+                throw new UnsupportedVersionException("LeaveGroup response version " + version +
+                    " can only contain one member, got " + data.members().size() + " members.");
+            }
+
+            this.data = new LeaveGroupResponseData()

Review Comment:
   This seems to be normalizing the response? Seems like we just need to set the error code, but its done in a way that's a little confusing. Why do we need to getError when we have the data.errorCode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12850: KAFKA-14367; Add `LeaveGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12850:
URL: https://github.com/apache/kafka/pull/12850#discussion_r1028558389


##########
clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java:
##########
@@ -55,6 +56,22 @@ public LeaveGroupResponse(LeaveGroupResponseData data) {
         this.data = data;
     }
 
+    public LeaveGroupResponse(LeaveGroupResponseData data, short version) {
+        super(ApiKeys.LEAVE_GROUP);
+
+        if (version >= 3) {
+            this.data = data;
+        } else {
+            if (data.members().size() != 1) {
+                throw new UnsupportedVersionException("LeaveGroup response version " + version +
+                    " can only contain one member, got " + data.members().size() + " members.");
+            }
+
+            this.data = new LeaveGroupResponseData()

Review Comment:
   This seems to be normalizing the response? Seems like we just need to set the error code, but its done in a way that's a little confusing. Why do we need to getError when we have the data.errorCode?
   
   Is it because it can be the top level error OR the single member error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12850: KAFKA-14367; Add `LeaveGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12850:
URL: https://github.com/apache/kafka/pull/12850#discussion_r1035689436


##########
clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java:
##########
@@ -55,6 +56,22 @@ public LeaveGroupResponse(LeaveGroupResponseData data) {
         this.data = data;
     }
 
+    public LeaveGroupResponse(LeaveGroupResponseData data, short version) {
+        super(ApiKeys.LEAVE_GROUP);
+
+        if (version >= 3) {
+            this.data = data;
+        } else {
+            if (data.members().size() != 1) {
+                throw new UnsupportedVersionException("LeaveGroup response version " + version +
+                    " can only contain one member, got " + data.members().size() + " members.");
+            }
+
+            this.data = new LeaveGroupResponseData()

Review Comment:
   That's right. It is because it can be the top level error OR the member error. This is how it was done before, see L82.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org