You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/14 08:35:18 UTC

[GitHub] [kafka] dajac opened a new pull request, #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

dajac opened a new pull request, #12845:
URL: https://github.com/apache/kafka/pull/12845

   This patch adds `joinGroup` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.
   
   For the context, I will do the same for all the other interactions with the current group coordinator. In order to limit the changes, I have chosen to introduce the `GroupCoordinatorAdapter` that translates the new interface to the old one. It is basically a wrapper. This allows keeping the current group coordinator untouched for now and focus on the `KafkaApis` changes. Eventually, we can remove `GroupCoordinatorAdapter`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1024835046


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(

Review Comment:
   I can remove the `From` part. I am not too opinionated on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026101492


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2530,208 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-    }
-  }
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setMemberId("member")
+      .setGenerationId(0)
+      .setLeader("leader")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    future.complete(expectedJoinGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(expectedJoinGroupResponse, response.data)
+  }
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolNameBackwardCompatibility(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+    val joinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolName(null)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
-    )
-    capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol)
 
+    future.complete(joinGroupResponse)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
-
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
-    assertEquals(GroupCoordinator.NoLeader, response.data.leader)
-    assertNull(response.data.protocolType)
-
-    if (version >= 7) {
-      assertNull(response.data.protocolName)
-    } else {
-      assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
-    }
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
   @Test
-  def testJoinGroupProtocolType(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupProtocolType(version.asInstanceOf[Short])
-    }
-  }
+  def testHandleJoinGroupRequestFutureFailed(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
 
-  def testJoinGroupProtocolType(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val protocolName = "range"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.JOIN_GROUP.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(joinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    future.completeExceptionally(Errors.REQUEST_TIMED_OUT.exception)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.REQUEST_TIMED_OUT, response.error)
+  }
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+  @Test
+  def testHandleJoinGroupRequestAuthorizationFailed(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    capturedCallback.getValue.apply(JoinGroupResult(
-      members = List.empty,
-      memberId = memberId,
-      generationId = 0,
-      protocolType = Some(protocolType),
-      protocolName = Some(protocolName),
-      leaderId = memberId,
-      skipAssignment = true,
-      error = Errors.NONE
-    ))
+
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
+  }
 
-    assertEquals(Errors.NONE, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(0, response.data.generationId)
-    assertEquals(memberId, response.data.leader)
-    assertEquals(protocolName, response.data.protocolName)
-    assertEquals(protocolType, response.data.protocolType)
-    assertTrue(response.data.skipAssignment)
+  @Test
+  def testHandleJoinGroupRequestUnexpectedException(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.JOIN_GROUP.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
+
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(joinGroupRequest)
+    )).thenReturn(future)
+
+    val response = new AtomicReference[JoinGroupResponse]()

Review Comment:
   i just needed a container to store the response. i could have used a var initialized to null as well, i suppose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026780137


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(joinResult.protocolName.orNull)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava),
-          request.context.apiVersion
-        )
+  def handleJoinGroupRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
+    val joinGroupRequest = request.body[JoinGroupRequest]
 
-        trace("Sending join group response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending join group response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response

Review Comment:
   If this returns unit, is there a reason we say response here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12845:
URL: https://github.com/apache/kafka/pull/12845#issuecomment-1316692147

   @jeffkbkim @jolshan Thanks for your comments. I have addressed your feedback. Note that I have extracted the changes about the JoinGroupResponse version handling in a separate PR: https://github.com/apache/kafka/pull/12864. It is less risky this way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12845:
URL: https://github.com/apache/kafka/pull/12845#issuecomment-1323780346

   > I left a comment about leaving more comments on tests, not a big deal either way, but just wanted to remind.
   
   @jolshan Thanks. I thought about this and I feel like the test is self explanatory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12845:
URL: https://github.com/apache/kafka/pull/12845#issuecomment-1324739555

   I have addressed Jason's comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021208615


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-    }
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setMemberId("member")
+      .setGenerationId(0)
+      .setLeader("leader")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
+
+    future.complete(expectedJoinGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    val joinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
-
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
-    )
-    capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolType(if (version >= 7) null else GroupCoordinator.NoProtocol)
 
+    future.complete(joinGroupResponse)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
-
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
-    assertEquals(GroupCoordinator.NoLeader, response.data.leader)
-    assertNull(response.data.protocolType)
-
-    if (version >= 7) {
-      assertNull(response.data.protocolName)
-    } else {
-      assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
-    }
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
   @Test
-  def testJoinGroupProtocolType(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupProtocolType(version.asInstanceOf[Short])
-    }
-  }
+  def testHandleJoinGroupRequestFutureFailed(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
 
-  def testJoinGroupProtocolType(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val protocolName = "range"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.JOIN_GROUP.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(joinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    future.completeExceptionally(Errors.REQUEST_TIMED_OUT.exception)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.REQUEST_TIMED_OUT, response.error)
+  }
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+  @Test
+  def testHandleJoinGroupRequestAuthorizationFailed(): Unit = {

Review Comment:
   This is a new test. This code path was not tested before from `KafkaApisTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021208132


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-    }
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setMemberId("member")
+      .setGenerationId(0)
+      .setLeader("leader")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
+
+    future.complete(expectedJoinGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    val joinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
-
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
-    )
-    capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolType(if (version >= 7) null else GroupCoordinator.NoProtocol)
 
+    future.complete(joinGroupResponse)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
-
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
-    assertEquals(GroupCoordinator.NoLeader, response.data.leader)
-    assertNull(response.data.protocolType)
-
-    if (version >= 7) {
-      assertNull(response.data.protocolName)
-    } else {
-      assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
-    }
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
   @Test
-  def testJoinGroupProtocolType(): Unit = {

Review Comment:
   This is now named `testHandleJoinGroupRequest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021205683


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
+
   def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val joinGroupRequest = request.body[JoinGroupRequest]
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val protocolName = if (request.context.apiVersion() >= 7)
-          joinResult.protocolName.orNull
-        else
-          joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol)
-
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(protocolName)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava)
-        )
-
-        trace("Sending join group response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending join group response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
     if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION))
+      sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
     } else if (!authHelper.authorize(request.context, READ, GROUP, joinGroupRequest.data.groupId)) {
-      sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_AUTHORIZATION_FAILED))
+      sendResponse(joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
     } else {
-      val groupInstanceId = Option(joinGroupRequest.data.groupInstanceId)

Review Comment:
   This code is not in `GroupCoordinatorAdaptor` without any changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021207157


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {

Review Comment:
   This test's logic is not tested in `GroupCoordinatorAdaptorTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1030179126


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRequestContext.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+
+import java.net.InetAddress;
+import java.util.Objects;
+
+public class GroupCoordinatorRequestContext {
+
+    private final short apiVersion;

Review Comment:
   Yeah, I agree with you. I went with the minimal to start with.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026099763


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(joinResult.protocolName.orNull)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava),
-          request.context.apiVersion
-        )
+  def handleJoinGroupRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {

Review Comment:
   1. Yeah, that's right. I return a future to catch any other errors. It could be from `sendResponse` but it could also be other issues.
   2. No, it would not because the underlying API uses a callback.
   3. All APIs will be async for this reason but not that most of them are already async today. The difference is that we use a future here instead of a callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1029114602


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRequestContext.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+
+import java.net.InetAddress;
+import java.util.Objects;
+
+public class GroupCoordinatorRequestContext {

Review Comment:
   nit: Should we add `toString` for debugging?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,207 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-    }
-  }
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setMemberId("member")
+      .setGenerationId(0)
+      .setLeader("leader")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    future.complete(expectedJoinGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(expectedJoinGroupResponse, response.data)
+  }
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolNameBackwardCompatibility(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+    val joinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolName(null)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
-    )
-    capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol)
 
+    future.complete(joinGroupResponse)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
-
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
-    assertEquals(GroupCoordinator.NoLeader, response.data.leader)
-    assertNull(response.data.protocolType)
-
-    if (version >= 7) {
-      assertNull(response.data.protocolName)
-    } else {
-      assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
-    }
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
   @Test
-  def testJoinGroupProtocolType(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupProtocolType(version.asInstanceOf[Short])
-    }
-  }
+  def testHandleJoinGroupRequestFutureFailed(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
 
-  def testJoinGroupProtocolType(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val protocolName = "range"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.JOIN_GROUP.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(joinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    future.completeExceptionally(Errors.REQUEST_TIMED_OUT.exception)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.REQUEST_TIMED_OUT, response.error)
+  }
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+  @Test
+  def testHandleJoinGroupRequestAuthorizationFailed(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    capturedCallback.getValue.apply(JoinGroupResult(
-      members = List.empty,
-      memberId = memberId,
-      generationId = 0,
-      protocolType = Some(protocolType),
-      protocolName = Some(protocolName),
-      leaderId = memberId,
-      skipAssignment = true,
-      error = Errors.NONE
-    ))
+
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
+  }
 
-    assertEquals(Errors.NONE, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(0, response.data.generationId)
-    assertEquals(memberId, response.data.leader)
-    assertEquals(protocolName, response.data.protocolName)
-    assertEquals(protocolType, response.data.protocolType)
-    assertTrue(response.data.skipAssignment)
+  @Test
+  def testHandleJoinGroupRequestUnexpectedException(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.JOIN_GROUP.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
+
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(joinGroupRequest)
+    )).thenReturn(future)
+
+    var response: JoinGroupResponse = null
+    when(requestChannel.sendResponse(any(), any(), any())).thenAnswer { _ =>
+      throw new Exception("Something went wrong")
+    }.thenAnswer { invocation =>
+      response = invocation.getArgument(1, classOf[JoinGroupResponse])
+    }
+
+    createKafkaApis().handle(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    future.completeExceptionally(Errors.NOT_COORDINATOR.exception)
+    assertEquals(Errors.UNKNOWN_SERVER_ERROR, response.error)

Review Comment:
   nit: Could we add a comment here to mention we expected to catch the exception in `sendResponse`, and handleError will handle it? Otherwise, it is confusing why don't we get NOT_COORDINATOR error in response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1024712573


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
+import kafka.server.RequestLocal
+
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData}
+import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
+import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext
+
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify}
+
+import java.net.InetAddress
+import scala.jdk.CollectionConverters._
+
+class GroupCoordinatorAdapterTest {
+
+  private def makeContext(
+    apiVersion: Short
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      apiVersion,
+      "client",
+      InetAddress.getLocalHost,
+      BufferSupplier.create()
+    )
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroup(version: Short): Unit = {

Review Comment:
   that makes sense. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12845:
URL: https://github.com/apache/kafka/pull/12845#issuecomment-1319647546

   @jeffkbkim Thanks for the review. I have addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026881513


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(joinResult.protocolName.orNull)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava),
-          request.context.apiVersion
-        )
+  def handleJoinGroupRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
+    val joinGroupRequest = request.body[JoinGroupRequest]
 
-        trace("Sending join group response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending join group response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response

Review Comment:
   hmm… sendResponseMaybeThrottle takes a function which returns a response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023263070


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
+import kafka.server.RequestLocal
+
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData}
+import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
+import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext
+
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify}
+
+import java.net.InetAddress
+import scala.jdk.CollectionConverters._
+
+class GroupCoordinatorAdapterTest {
+
+  private def makeContext(
+    apiVersion: Short
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      apiVersion,
+      "client",
+      InetAddress.getLocalHost,
+      BufferSupplier.create()
+    )
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroup(version: Short): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+    val ctx = makeContext(version)
+    val data = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")

Review Comment:
   Did we want to add a test for the null protocol?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026894950


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(joinResult.protocolName.orNull)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava),
-          request.context.apiVersion
-        )
+  def handleJoinGroupRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
+    val joinGroupRequest = request.body[JoinGroupRequest]
 
-        trace("Sending join group response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending join group response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response

Review Comment:
   Sorry I misread the method here. 🤦‍♀️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1027049342


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(joinResult.protocolName.orNull)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava),
-          request.context.apiVersion
-        )
+  def handleJoinGroupRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
+    val joinGroupRequest = request.body[JoinGroupRequest]
 
-        trace("Sending join group response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending join group response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
     if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION))
+      sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())

Review Comment:
   All handlers are already surrounded by a try..catch that catches all the exceptions raised. We just need to ensure that exceptions raised in futures’ callbacks are also caught.
   
   In this particular case, any exceptions raised by sendResponse would be caught by that try..catch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026102820


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2530,208 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-    }
-  }
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setMemberId("member")
+      .setGenerationId(0)
+      .setLeader("leader")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    future.complete(expectedJoinGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(expectedJoinGroupResponse, response.data)
+  }
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolNameBackwardCompatibility(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+    val joinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolName(null)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
-    )
-    capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol)
 
+    future.complete(joinGroupResponse)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
-
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
-    assertEquals(GroupCoordinator.NoLeader, response.data.leader)
-    assertNull(response.data.protocolType)
-
-    if (version >= 7) {
-      assertNull(response.data.protocolName)
-    } else {
-      assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
-    }
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
   @Test
-  def testJoinGroupProtocolType(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupProtocolType(version.asInstanceOf[Short])
-    }
-  }
+  def testHandleJoinGroupRequestFutureFailed(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
 
-  def testJoinGroupProtocolType(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val protocolName = "range"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.JOIN_GROUP.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(joinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    future.completeExceptionally(Errors.REQUEST_TIMED_OUT.exception)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.REQUEST_TIMED_OUT, response.error)
+  }
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+  @Test
+  def testHandleJoinGroupRequestAuthorizationFailed(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    capturedCallback.getValue.apply(JoinGroupResult(
-      members = List.empty,
-      memberId = memberId,
-      generationId = 0,
-      protocolType = Some(protocolType),
-      protocolName = Some(protocolName),
-      leaderId = memberId,
-      skipAssignment = true,
-      error = Errors.NONE
-    ))
+
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
+  }
 
-    assertEquals(Errors.NONE, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(0, response.data.generationId)
-    assertEquals(memberId, response.data.leader)
-    assertEquals(protocolName, response.data.protocolName)
-    assertEquals(protocolType, response.data.protocolType)
-    assertTrue(response.data.skipAssignment)
+  @Test
+  def testHandleJoinGroupRequestUnexpectedException(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.JOIN_GROUP.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
+
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(joinGroupRequest)
+    )).thenReturn(future)
+
+    val response = new AtomicReference[JoinGroupResponse]()
+    when(requestChannel.sendResponse(any(), any(), any())).thenAnswer { _ =>
+      throw new Exception("Something went wrong")
+    }.thenAnswer { invocation =>
+      val resp = invocation.getArgument(1, classOf[JoinGroupResponse])
+      response.set(resp)
+    }
+
+    createKafkaApis().handle(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )

Review Comment:
   it does not. keep in mind that we use a future internally so the method is executed and returns immediately. then, the future is completed later on by `future.completeExceptionally(Errors.NOT_COORDINATOR.exception)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on PR #12845:
URL: https://github.com/apache/kafka/pull/12845#issuecomment-1322572419

   @dajac This is looking close to ready. I just had a few points.
   1. I left a comment about leaving more comments on tests, not a big deal either way, but just wanted to remind.
   2. I see a lot of mirror maker tests failing that I couldn't find on other branches. It is likely unrelated, but wanted to confirm.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12845:
URL: https://github.com/apache/kafka/pull/12845#issuecomment-1323781375

   @jeffkbkim @jolshan @showuon Thanks for your comments. I have addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023275236


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-    }
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setMemberId("member")
+      .setGenerationId(0)
+      .setLeader("leader")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
+
+    future.complete(expectedJoinGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    val joinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
-
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
-    )
-    capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolType(if (version >= 7) null else GroupCoordinator.NoProtocol)
 
+    future.complete(joinGroupResponse)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
-
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
-    assertEquals(GroupCoordinator.NoLeader, response.data.leader)
-    assertNull(response.data.protocolType)
-
-    if (version >= 7) {
-      assertNull(response.data.protocolName)
-    } else {
-      assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
-    }
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
   @Test
-  def testJoinGroupProtocolType(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupProtocolType(version.asInstanceOf[Short])
-    }
-  }
+  def testHandleJoinGroupRequestFutureFailed(): Unit = {

Review Comment:
   Is this also a new test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021208132


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-    }
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setMemberId("member")
+      .setGenerationId(0)
+      .setLeader("leader")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
+
+    future.complete(expectedJoinGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    val joinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
-
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
-    )
-    capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolType(if (version >= 7) null else GroupCoordinator.NoProtocol)
 
+    future.complete(joinGroupResponse)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
-
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
-    assertEquals(GroupCoordinator.NoLeader, response.data.leader)
-    assertNull(response.data.protocolType)
-
-    if (version >= 7) {
-      assertNull(response.data.protocolName)
-    } else {
-      assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
-    }
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
   @Test
-  def testJoinGroupProtocolType(): Unit = {

Review Comment:
   This is not `testHandleJoinGroupRequest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023270208


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {

Review Comment:
   Git reallly made the comparison hard for these tests. 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021206505


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -94,6 +95,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val metadataSupport: MetadataSupport,
                 val replicaManager: ReplicaManager,
                 val groupCoordinator: GroupCoordinator,
+                val newGroupCoordinator: org.apache.kafka.coordinator.group.GroupCoordinator,

Review Comment:
   This is a temporary change. When all the API are migrated to new new interface, I will change `groupCoordinator` to the new interface and remove this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12845:
URL: https://github.com/apache/kafka/pull/12845#issuecomment-1318257181

   Rebased the PR. Ready for second round.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1027036420


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(joinResult.protocolName.orNull)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava),
-          request.context.apiVersion
-        )
+  def handleJoinGroupRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
+    val joinGroupRequest = request.body[JoinGroupRequest]
 
-        trace("Sending join group response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending join group response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
     if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION))
+      sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())

Review Comment:
   `newGroupCoordinator.joinGroup()` captures exceptions inside the future returned to `handleJoinGroupRequest()` which is tested by the unit test. if we return `CompletableFuture.completedFuture()` here and in line 1694, would we be capturing the exceptions thrown during sendResponse()? not sure if that would happen, but thought it was worth mentioning since we seem to be simulating that in the unit test.
   
   i guess i'm wondering whether we are handling these cases differently and if so, why



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2530,208 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-    }
-  }
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setMemberId("member")
+      .setGenerationId(0)
+      .setLeader("leader")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    future.complete(expectedJoinGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(expectedJoinGroupResponse, response.data)
+  }
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolNameBackwardCompatibility(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+    val joinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolName(null)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
-    )
-    capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol)
 
+    future.complete(joinGroupResponse)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
-
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
-    assertEquals(GroupCoordinator.NoLeader, response.data.leader)
-    assertNull(response.data.protocolType)
-
-    if (version >= 7) {
-      assertNull(response.data.protocolName)
-    } else {
-      assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
-    }
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
   @Test
-  def testJoinGroupProtocolType(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupProtocolType(version.asInstanceOf[Short])
-    }
-  }
+  def testHandleJoinGroupRequestFutureFailed(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
 
-  def testJoinGroupProtocolType(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val protocolName = "range"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.JOIN_GROUP.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(joinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    future.completeExceptionally(Errors.REQUEST_TIMED_OUT.exception)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.REQUEST_TIMED_OUT, response.error)
+  }
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+  @Test
+  def testHandleJoinGroupRequestAuthorizationFailed(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    capturedCallback.getValue.apply(JoinGroupResult(
-      members = List.empty,
-      memberId = memberId,
-      generationId = 0,
-      protocolType = Some(protocolType),
-      protocolName = Some(protocolName),
-      leaderId = memberId,
-      skipAssignment = true,
-      error = Errors.NONE
-    ))
+
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
+  }
 
-    assertEquals(Errors.NONE, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(0, response.data.generationId)
-    assertEquals(memberId, response.data.leader)
-    assertEquals(protocolName, response.data.protocolName)
-    assertEquals(protocolType, response.data.protocolType)
-    assertTrue(response.data.skipAssignment)
+  @Test
+  def testHandleJoinGroupRequestUnexpectedException(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.JOIN_GROUP.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
+
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(joinGroupRequest)
+    )).thenReturn(future)
+
+    val response = new AtomicReference[JoinGroupResponse]()
+    when(requestChannel.sendResponse(any(), any(), any())).thenAnswer { _ =>
+      throw new Exception("Something went wrong")
+    }.thenAnswer { invocation =>
+      val resp = invocation.getArgument(1, classOf[JoinGroupResponse])
+      response.set(resp)
+    }
+
+    createKafkaApis().handle(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )

Review Comment:
   gotcha. i think i was missing this from the CompletableFuture java docs
   
   > Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
   
   so the thread that completes the future will attempt to finish all (non-async) dependent chains



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(joinResult.protocolName.orNull)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava),
-          request.context.apiVersion
-        )
+  def handleJoinGroupRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {

Review Comment:
   i am also curious, that is one of my confusions :sweat_smile: 
   
   > All APIs will be async for this reason but not that most of them are already async today
   
   can you elaborate on the reason and clarify whether most of the APIs are async or not? i was under the impression that a single API request -> response is handled synchronously by a request handler thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1024713679


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(

Review Comment:
   the `From` part i guess. i don't think i've seen a method name like it, though it does make sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023210180


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
+
   def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val joinGroupRequest = request.body[JoinGroupRequest]
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val protocolName = if (request.context.apiVersion() >= 7)
-          joinResult.protocolName.orNull
-        else
-          joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol)
-
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(protocolName)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava)
-        )
-
-        trace("Sending join group response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending join group response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
     if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION))
+      sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
     } else if (!authHelper.authorize(request.context, READ, GROUP, joinGroupRequest.data.groupId)) {
-      sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_AUTHORIZATION_FAILED))
+      sendResponse(joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
     } else {
-      val groupInstanceId = Option(joinGroupRequest.data.groupInstanceId)

Review Comment:
   Did you mean that it is in the adapter? I see it at line 50



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023212872


##########
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java:
##########
@@ -28,9 +28,15 @@ public class JoinGroupResponse extends AbstractResponse {
 
     private final JoinGroupResponseData data;
 
-    public JoinGroupResponse(JoinGroupResponseData data) {
+    public JoinGroupResponse(JoinGroupResponseData data, short version) {
         super(ApiKeys.JOIN_GROUP);
         this.data = data;
+
+        // All versions prior to version 7 do not support nullable
+        // string for the protocol type. Empty string should be used.
+        if (version < 7 && data.protocolType() == null) {

Review Comment:
   I see that this was made nullable in version 7, but we are now on version 9. Did we just not enforce this before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023212872


##########
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java:
##########
@@ -28,9 +28,15 @@ public class JoinGroupResponse extends AbstractResponse {
 
     private final JoinGroupResponseData data;
 
-    public JoinGroupResponse(JoinGroupResponseData data) {
+    public JoinGroupResponse(JoinGroupResponseData data, short version) {
         super(ApiKeys.JOIN_GROUP);
         this.data = data;
+
+        // All versions prior to version 7 do not support nullable
+        // string for the protocol type. Empty string should be used.
+        if (version < 7 && data.protocolType() == null) {

Review Comment:
   I see that this was made nullable in version 7, but we are now on version 9. Did we just not check this before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023478149


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.RequestLocal
+
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData}
+import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext
+
+import java.util.concurrent.CompletableFuture
+import scala.jdk.CollectionConverters._
+
+class GroupCoordinatorAdapter(

Review Comment:
   can we add a comment that this is a wrapper around the existing group coordinator?



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
+import kafka.server.RequestLocal
+
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData}
+import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
+import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext
+
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify}
+
+import java.net.InetAddress
+import scala.jdk.CollectionConverters._
+
+class GroupCoordinatorAdapterTest {
+
+  private def makeContext(
+    apiVersion: Short
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      apiVersion,
+      "client",
+      InetAddress.getLocalHost,
+      BufferSupplier.create()
+    )
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroup(version: Short): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+    val ctx = makeContext(version)
+    val data = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+      .setReason("reason")
+      .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(List(
+        new JoinGroupRequestProtocol()
+          .setName("first")
+          .setMetadata("first".getBytes()),
+        new JoinGroupRequestProtocol()
+          .setName("second")
+          .setMetadata("second".getBytes())).iterator.asJava))
+
+    val future = adapter.joinGroup(ctx, data)
+    assertFalse(future.isDone)
+
+    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] =
+      ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val capturedCallback: ArgumentCaptor[JoinGroupCallback] =
+      ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+
+    verify(groupCoordinator).handleJoinGroup(
+      ArgumentMatchers.eq(data.groupId),
+      ArgumentMatchers.eq(data.memberId),
+      ArgumentMatchers.eq(None),
+      ArgumentMatchers.eq(if (version >= 4) true else false),
+      ArgumentMatchers.eq(if (version >= 9) true else false),
+      ArgumentMatchers.eq(ctx.clientId),
+      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
+      ArgumentMatchers.eq(data.rebalanceTimeoutMs),
+      ArgumentMatchers.eq(data.sessionTimeoutMs),
+      ArgumentMatchers.eq(data.protocolType),
+      capturedProtocols.capture(),
+      capturedCallback.capture(),
+      ArgumentMatchers.eq(Some("reason")),
+      ArgumentMatchers.eq(RequestLocal(ctx.bufferSupplier))
+    )
+
+    assertEquals(List(
+      ("first", "first"),
+      ("second", "second")
+    ), capturedProtocols.getValue.map { case (name, metadata) =>
+      (name, new String(metadata))
+    }.toList)

Review Comment:
   do we need toList?



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
+import kafka.server.RequestLocal
+
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData}
+import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
+import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext
+
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify}
+
+import java.net.InetAddress
+import scala.jdk.CollectionConverters._
+
+class GroupCoordinatorAdapterTest {
+
+  private def makeContext(
+    apiVersion: Short
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      apiVersion,
+      "client",
+      InetAddress.getLocalHost,
+      BufferSupplier.create()
+    )
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroup(version: Short): Unit = {

Review Comment:
   maybe a more descriptive test name would be useful here



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {

Review Comment:
   do you mean "now"?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(

Review Comment:
   the naming seems slightly off. how's createGroupCoordinatorRequestContext?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -94,6 +95,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val metadataSupport: MetadataSupport,
                 val replicaManager: ReplicaManager,
                 val groupCoordinator: GroupCoordinator,
+                val newGroupCoordinator: org.apache.kafka.coordinator.group.GroupCoordinator,

Review Comment:
   should we add a comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023271511


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
+import kafka.server.RequestLocal
+
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData}
+import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
+import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext
+
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify}
+
+import java.net.InetAddress
+import scala.jdk.CollectionConverters._
+
+class GroupCoordinatorAdapterTest {
+
+  private def makeContext(
+    apiVersion: Short
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      apiVersion,
+      "client",
+      InetAddress.getLocalHost,
+      BufferSupplier.create()
+    )
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroup(version: Short): Unit = {

Review Comment:
   Can you clarify a bit on how this test is different from the kafka apis ones?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023603789


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {

Review Comment:
   Right. Let me fix it.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {

Review Comment:
   This test's logic is now tested in `GroupCoordinatorAdaptorTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021205253


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
+
   def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val joinGroupRequest = request.body[JoinGroupRequest]
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val protocolName = if (request.context.apiVersion() >= 7)
-          joinResult.protocolName.orNull
-        else
-          joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol)
-
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()

Review Comment:
   This translation is now in `GroupCoordinatorAdapter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1025980136


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -161,6 +166,12 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Top-level method that handles all requests and multiplexes to the right api
    */
   override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
+    def handleError(e: Throwable): Unit = {
+      trace(s"Unexpected error handling request ${request.requestDesc(true)} " +

Review Comment:
   it looks like this message was an error level log and this change affects all other apis. what's the reason for changing it to trace?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2530,208 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-    }
-  }
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setMemberId("member")
+      .setGenerationId(0)
+      .setLeader("leader")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    future.complete(expectedJoinGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(expectedJoinGroupResponse, response.data)
+  }
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolNameBackwardCompatibility(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+    val joinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolName(null)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
-    )
-    capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol)
 
+    future.complete(joinGroupResponse)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
-
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
-    assertEquals(GroupCoordinator.NoLeader, response.data.leader)
-    assertNull(response.data.protocolType)
-
-    if (version >= 7) {
-      assertNull(response.data.protocolName)
-    } else {
-      assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
-    }
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
   @Test
-  def testJoinGroupProtocolType(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupProtocolType(version.asInstanceOf[Short])
-    }
-  }
+  def testHandleJoinGroupRequestFutureFailed(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
 
-  def testJoinGroupProtocolType(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val protocolName = "range"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.JOIN_GROUP.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(joinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    future.completeExceptionally(Errors.REQUEST_TIMED_OUT.exception)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.REQUEST_TIMED_OUT, response.error)
+  }
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+  @Test
+  def testHandleJoinGroupRequestAuthorizationFailed(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    capturedCallback.getValue.apply(JoinGroupResult(
-      members = List.empty,
-      memberId = memberId,
-      generationId = 0,
-      protocolType = Some(protocolType),
-      protocolName = Some(protocolName),
-      leaderId = memberId,
-      skipAssignment = true,
-      error = Errors.NONE
-    ))
+
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
+  }
 
-    assertEquals(Errors.NONE, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(0, response.data.generationId)
-    assertEquals(memberId, response.data.leader)
-    assertEquals(protocolName, response.data.protocolName)
-    assertEquals(protocolType, response.data.protocolType)
-    assertTrue(response.data.skipAssignment)
+  @Test
+  def testHandleJoinGroupRequestUnexpectedException(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.JOIN_GROUP.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
+
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(joinGroupRequest)
+    )).thenReturn(future)
+
+    val response = new AtomicReference[JoinGroupResponse]()

Review Comment:
   can you help me understand the reason for using an atomic reference? 



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(joinResult.protocolName.orNull)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava),
-          request.context.apiVersion
-        )
+  def handleJoinGroupRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {

Review Comment:
   some questions for my understanding:
   1. it looks like we handle exceptions thrown by the new joinGroup in `newGroupCoordinator.joinGroup(ctx, joinGroupRequest.data).handle[Unit]`. are we returning a future here to handle exceptions thrown during `sendResponse()`? it looks to me that would be handled by `case e: Throwable => handleError(e)` or am i missing something?
   3. would `GroupCoordinatorAdapter.joinGroup()` work without returning a CompletableFuture object? 
   4. are we returning CompletableFuture in GroupCoordinatorAdapter to prepare it for the new group coordinator since it will use multiple threads to handle a single join/sync group request?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2530,208 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-    }
-  }
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setMemberId("member")
+      .setGenerationId(0)
+      .setLeader("leader")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    future.complete(expectedJoinGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(expectedJoinGroupResponse, response.data)
+  }
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolNameBackwardCompatibility(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+    val joinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolName(null)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
-    )
-    capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol)
 
+    future.complete(joinGroupResponse)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
-
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
-    assertEquals(GroupCoordinator.NoLeader, response.data.leader)
-    assertNull(response.data.protocolType)
-
-    if (version >= 7) {
-      assertNull(response.data.protocolName)
-    } else {
-      assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
-    }
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
   @Test
-  def testJoinGroupProtocolType(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupProtocolType(version.asInstanceOf[Short])
-    }
-  }
+  def testHandleJoinGroupRequestFutureFailed(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
 
-  def testJoinGroupProtocolType(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val protocolName = "range"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.JOIN_GROUP.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(joinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    future.completeExceptionally(Errors.REQUEST_TIMED_OUT.exception)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.REQUEST_TIMED_OUT, response.error)
+  }
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
+  @Test
+  def testHandleJoinGroupRequestAuthorizationFailed(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    capturedCallback.getValue.apply(JoinGroupResult(
-      members = List.empty,
-      memberId = memberId,
-      generationId = 0,
-      protocolType = Some(protocolType),
-      protocolName = Some(protocolName),
-      leaderId = memberId,
-      skipAssignment = true,
-      error = Errors.NONE
-    ))
+
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
+  }
 
-    assertEquals(Errors.NONE, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(0, response.data.generationId)
-    assertEquals(memberId, response.data.leader)
-    assertEquals(protocolName, response.data.protocolName)
-    assertEquals(protocolType, response.data.protocolType)
-    assertTrue(response.data.skipAssignment)
+  @Test
+  def testHandleJoinGroupRequestUnexpectedException(): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.JOIN_GROUP.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
+
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(joinGroupRequest)
+    )).thenReturn(future)
+
+    val response = new AtomicReference[JoinGroupResponse]()
+    when(requestChannel.sendResponse(any(), any(), any())).thenAnswer { _ =>
+      throw new Exception("Something went wrong")
+    }.thenAnswer { invocation =>
+      val resp = invocation.getArgument(1, classOf[JoinGroupResponse])
+      response.set(resp)
+    }
+
+    createKafkaApis().handle(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )

Review Comment:
   i'm surely missing something here - shouldn't the test be blocked here? i can't seem to find the other thread running 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1028803896


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -184,6 +195,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
         case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
         case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal)
+          .exceptionally(handleError)

Review Comment:
   nit: i personally think it's neater to merge this with the line above, since all the other cases are single-lined



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026680653


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(joinResult.protocolName.orNull)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava),
-          request.context.apiVersion
-        )
+  def handleJoinGroupRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {

Review Comment:
   Sorry If i missed it, but what was the reasoning of using futures instead of callbacks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023606759


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
+import kafka.server.RequestLocal
+
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData}
+import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
+import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext
+
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify}
+
+import java.net.InetAddress
+import scala.jdk.CollectionConverters._
+
+class GroupCoordinatorAdapterTest {
+
+  private def makeContext(
+    apiVersion: Short
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      apiVersion,
+      "client",
+      InetAddress.getLocalHost,
+      BufferSupplier.create()
+    )
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroup(version: Short): Unit = {

Review Comment:
   This test tests the logic of `joinGroup` method in the `GroupCoordinatorAdapter`. Given an input, it verifies the output. The tests in KafkaApisTests do not do this as they only verity that the adaptor get the expected input.
   
   The name seems pretty clear to me as the test tests the `joinGroup` method.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-    }
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setMemberId("member")
+      .setGenerationId(0)
+      .setLeader("leader")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
+
+    future.complete(expectedJoinGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    val joinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
-
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
-    )
-    capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolType(if (version >= 7) null else GroupCoordinator.NoProtocol)
 
+    future.complete(joinGroupResponse)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
-
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
-    assertEquals(GroupCoordinator.NoLeader, response.data.leader)
-    assertNull(response.data.protocolType)
-
-    if (version >= 7) {
-      assertNull(response.data.protocolName)
-    } else {
-      assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
-    }
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
   @Test
-  def testJoinGroupProtocolType(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupProtocolType(version.asInstanceOf[Short])
-    }
-  }
+  def testHandleJoinGroupRequestFutureFailed(): Unit = {

Review Comment:
   That's right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023749684


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(

Review Comment:
   why do you think so? is it because of the usage of `make`? the name looks quite reasonable to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023258771


##########
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java:
##########
@@ -28,9 +28,15 @@ public class JoinGroupResponse extends AbstractResponse {
 
     private final JoinGroupResponseData data;
 
-    public JoinGroupResponse(JoinGroupResponseData data) {
+    public JoinGroupResponse(JoinGroupResponseData data, short version) {
         super(ApiKeys.JOIN_GROUP);
         this.data = data;
+
+        // All versions prior to version 7 do not support nullable
+        // string for the protocol type. Empty string should be used.
+        if (version < 7 && data.protocolType() == null) {

Review Comment:
   Oh I see it used to be done here in KafkaApis:
   ```
           val protocolName = if (request.context.apiVersion() >= 7)
             joinResult.protocolName.orNull
           else
             joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023605151


##########
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java:
##########
@@ -28,9 +28,15 @@ public class JoinGroupResponse extends AbstractResponse {
 
     private final JoinGroupResponseData data;
 
-    public JoinGroupResponse(JoinGroupResponseData data) {
+    public JoinGroupResponse(JoinGroupResponseData data, short version) {
         super(ApiKeys.JOIN_GROUP);
         this.data = data;
+
+        // All versions prior to version 7 do not support nullable
+        // string for the protocol type. Empty string should be used.
+        if (version < 7 && data.protocolType() == null) {

Review Comment:
   Right it was there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021207545


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {

Review Comment:
   I have renames this one to `testJoinGroupProtocolTypeBackwardCompatibility`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023272558


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-    }
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setMemberId("member")
+      .setGenerationId(0)
+      .setLeader("leader")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
+
+    future.complete(expectedJoinGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    val joinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
-
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
-    )
-    capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolType(if (version >= 7) null else GroupCoordinator.NoProtocol)
 
+    future.complete(joinGroupResponse)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
-
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
-    assertEquals(GroupCoordinator.NoLeader, response.data.leader)
-    assertNull(response.data.protocolType)
-
-    if (version >= 7) {
-      assertNull(response.data.protocolName)
-    } else {
-      assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
-    }
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
   @Test
-  def testJoinGroupProtocolType(): Unit = {

Review Comment:
   I'm not sure I follow here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023261160


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
+
   def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val joinGroupRequest = request.body[JoinGroupRequest]
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val protocolName = if (request.context.apiVersion() >= 7)
-          joinResult.protocolName.orNull
-        else
-          joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol)
-
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()

Review Comment:
   Looks like we've also simplified building the response.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
+
   def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val joinGroupRequest = request.body[JoinGroupRequest]
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val protocolName = if (request.context.apiVersion() >= 7)
-          joinResult.protocolName.orNull
-        else
-          joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol)
-
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()

Review Comment:
   Looks like we've also simplified building the response. Looks good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023204927


##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -22,9 +22,8 @@ import java.util
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException}
-

Review Comment:
   nit: did we mean to remove this line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023604793


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
     assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-    val protocols = List(
-      ("first", "first".getBytes()),
-      ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
     )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
-    val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    createKafkaApis().handleJoinGroupRequest(
-      buildRequest(
-        new JoinGroupRequest.Builder(
-          new JoinGroupRequestData()
-            .setGroupId(groupId)
-            .setMemberId(memberId)
-            .setProtocolType(protocolType)
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-            .setSessionTimeoutMs(sessionTimeoutMs)
-            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-              protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
-                .setName(name).setMetadata(protocol)
-              }.iterator.asJava))
-        ).build()
-      ),
-      RequestLocal.withThreadConfinedCaching)
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(true),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(rebalanceTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      capturedProtocols.capture(),
-      any(),
-      any(),
-      any()
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
     )
-    val capturedProtocolsList = capturedProtocols.getValue
-    assertEquals(protocols.size, capturedProtocolsList.size)
-    protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
-      assertEquals(expectedName, name)
-      assertArrayEquals(expectedBytes, bytes)
-    }
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-    for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
-      testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-    }
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setMemberId("member")
+      .setGenerationId(0)
+      .setLeader("leader")
+      .setProtocolType("consumer")
+      .setProtocolName("range")
+
+    future.complete(expectedJoinGroupResponse)
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-    reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = {
+    val joinGroupRequest = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")
+      .setRebalanceTimeoutMs(1000)
+      .setSessionTimeoutMs(2000)
+
+    val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      version,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
 
-    val groupId = "group"
-    val memberId = "member1"
-    val protocolType = "consumer"
-    val rebalanceTimeoutMs = 10
-    val sessionTimeoutMs = 5
+    val expectedJoinGroupRequest = new JoinGroupRequestData()
+      .setGroupId(joinGroupRequest.groupId)
+      .setMemberId(joinGroupRequest.memberId)
+      .setProtocolType(joinGroupRequest.protocolType)
+      .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+      .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-    val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
+    val future = new CompletableFuture[JoinGroupResponseData]()
+    when(newGroupCoordinator.joinGroup(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(expectedJoinGroupRequest)
+    )).thenReturn(future)
 
-    val joinGroupRequest = new JoinGroupRequest.Builder(
-      new JoinGroupRequestData()
-        .setGroupId(groupId)
-        .setMemberId(memberId)
-        .setProtocolType(protocolType)
-        .setRebalanceTimeoutMs(rebalanceTimeoutMs)
-        .setSessionTimeoutMs(sessionTimeoutMs)
-    ).build(version)
+    createKafkaApis().handleJoinGroupRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
 
-    val requestChannelRequest = buildRequest(joinGroupRequest)
+    val joinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
 
-    createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
-
-    verify(groupCoordinator).handleJoinGroup(
-      ArgumentMatchers.eq(groupId),
-      ArgumentMatchers.eq(memberId),
-      ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(if (version >= 4) true else false),
-      ArgumentMatchers.eq(if (version >= 9) true else false),
-      ArgumentMatchers.eq(clientId),
-      ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-      ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
-      ArgumentMatchers.eq(sessionTimeoutMs),
-      ArgumentMatchers.eq(protocolType),
-      ArgumentMatchers.eq(List.empty),
-      capturedCallback.capture(),
-      any(),
-      any()
-    )
-    capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+    val expectedJoinGroupResponse = new JoinGroupResponseData()
+      .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
+      .setMemberId("member")
+      .setProtocolType(if (version >= 7) null else GroupCoordinator.NoProtocol)
 
+    future.complete(joinGroupResponse)
     val capturedResponse = verifyNoThrottling(requestChannelRequest)
     val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
-
-    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
-    assertEquals(0, response.data.members.size)
-    assertEquals(memberId, response.data.memberId)
-    assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
-    assertEquals(GroupCoordinator.NoLeader, response.data.leader)
-    assertNull(response.data.protocolType)
-
-    if (version >= 7) {
-      assertNull(response.data.protocolName)
-    } else {
-      assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
-    }
+    assertEquals(expectedJoinGroupResponse, response.data)
   }
 
   @Test
-  def testJoinGroupProtocolType(): Unit = {

Review Comment:
   Sorry for the typo. I meant `now` instead of `not`, again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1027049080


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(joinResult.protocolName.orNull)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava),
-          request.context.apiVersion
-        )
+  def handleJoinGroupRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {

Review Comment:
   A future is a better abstraction than a callback in my opinion but that is subjective. We could keep using callbacks as well. The end result in the same. The new Controller interface uses Futures as well so standardizing makes sense here.
   
   Regarding the current APIs, join group and sync group are definitely relying on callbacks. For the others, I don’t remember from the top of my head. In the new group coordinator, all of them will be executed in a different thread so they must be async.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1027049468


##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -21,11 +21,10 @@ import java.io.{File, IOException}
 import java.net.{InetAddress, SocketTimeoutException}
 import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
-

Review Comment:
   I will fix this on Monday. It seems that my IDE is doing this automatically somehow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023134907


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRequestContext.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+
+import java.net.InetAddress;
+import java.util.Objects;
+
+public class GroupCoordinatorRequestContext {
+
+    private final short apiVersion;

Review Comment:
   Perhaps we should use `RequestHeader` like the other request context objects. It's a little weird to see the api version without the corresponding api key.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRequestContext.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+
+import java.net.InetAddress;
+import java.util.Objects;
+
+public class GroupCoordinatorRequestContext {

Review Comment:
   I wonder if we could use the existing RequestContext object? Looks like the only thing missing is the buffer supplier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1030180438


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRequestContext.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+
+import java.net.InetAddress;
+import java.util.Objects;
+
+public class GroupCoordinatorRequestContext {

Review Comment:
   That works except for the buffer supplier. I think that we can pass the buffer supplier as a regular argument instead of passing it via the context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026684714


##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -21,11 +21,10 @@ import java.io.{File, IOException}
 import java.net.{InetAddress, SocketTimeoutException}
 import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
-

Review Comment:
   sorry nit here again 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026687239


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
+import kafka.server.RequestLocal
+
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData}
+import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
+import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext
+
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify}
+
+import java.net.InetAddress
+import scala.jdk.CollectionConverters._
+
+class GroupCoordinatorAdapterTest {
+
+  private def makeContext(
+    apiVersion: Short
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      apiVersion,
+      "client",
+      InetAddress.getLocalHost,
+      BufferSupplier.create()
+    )
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroup(version: Short): Unit = {

Review Comment:
   Maybe I'm just not as well versed at reading mocked code, but that description above really helped a lot. Would it be possible to add comments in the test explaining this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026098011


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -161,6 +166,12 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Top-level method that handles all requests and multiplexes to the right api
    */
   override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
+    def handleError(e: Throwable): Unit = {
+      trace(s"Unexpected error handling request ${request.requestDesc(true)} " +

Review Comment:
   it is a mistake. reverted to error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1027051707


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(joinResult.protocolName.orNull)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava),
-          request.context.apiVersion
-        )
+  def handleJoinGroupRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
+    val joinGroupRequest = request.body[JoinGroupRequest]
 
-        trace("Sending join group response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending join group response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
     if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION))
+      sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())

Review Comment:
   got it. thanks for the clarification



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac merged PR #12845:
URL: https://github.com/apache/kafka/pull/12845


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1028449377


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(joinResult.protocolName.orNull)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava),
-          request.context.apiVersion
-        )
+  def handleJoinGroupRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {

Review Comment:
   I don't have a strong opinion on callback vs future (maybe a slight bias for me towards callbacks because I'm more familiar, but not a good reason to change the design) but I do think it makes sense to be consistent. If we plan on using futures in the new controller interface then we can go ahead with using futures here too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023741182


##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -22,9 +22,8 @@ import java.util
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException}
-

Review Comment:
   Nope. Reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023263070


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
+import kafka.server.RequestLocal
+
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData}
+import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
+import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext
+
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify}
+
+import java.net.InetAddress
+import scala.jdk.CollectionConverters._
+
+class GroupCoordinatorAdapterTest {
+
+  private def makeContext(
+    apiVersion: Short
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      apiVersion,
+      "client",
+      InetAddress.getLocalHost,
+      BufferSupplier.create()
+    )
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroup(version: Short): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+    val ctx = makeContext(version)
+    val data = new JoinGroupRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setProtocolType("consumer")

Review Comment:
   Did we want to add a test for the null protocol?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023204927


##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -22,9 +22,8 @@ import java.util
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException}
-

Review Comment:
   nit: did we mean to remove this line? There are a few other files that do this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023603444


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+    new GroupCoordinatorRequestContext(
+      request.context.header.data.requestApiVersion,
+      request.context.header.data.clientId,
+      request.context.clientAddress,
+      requestLocal.bufferSupplier
+    )
+  }
+
   def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     val joinGroupRequest = request.body[JoinGroupRequest]
 
-    // the callback for sending a join-group response
-    def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val protocolName = if (request.context.apiVersion() >= 7)
-          joinResult.protocolName.orNull
-        else
-          joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol)
-
-        val responseBody = new JoinGroupResponse(
-          new JoinGroupResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(joinResult.error.code)
-            .setGenerationId(joinResult.generationId)
-            .setProtocolType(joinResult.protocolType.orNull)
-            .setProtocolName(protocolName)
-            .setLeader(joinResult.leaderId)
-            .setSkipAssignment(joinResult.skipAssignment)
-            .setMemberId(joinResult.memberId)
-            .setMembers(joinResult.members.asJava)
-        )
-
-        trace("Sending join group response %s for correlation id %d to client %s."
-          .format(responseBody, request.header.correlationId, request.header.clientId))
-        responseBody
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending join group response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
     if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION))
+      sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
     } else if (!authHelper.authorize(request.context, READ, GROUP, joinGroupRequest.data.groupId)) {
-      sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_AUTHORIZATION_FAILED))
+      sendResponse(joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
     } else {
-      val groupInstanceId = Option(joinGroupRequest.data.groupInstanceId)

Review Comment:
   Yeah, I meant `now` instead of `not`. This code was moved 1:1 there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org