You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/15 10:09:51 UTC

[GitHub] [kafka] dajac opened a new pull request, #12853: KAFKA-14367; Add `ListGroups` to the new `GroupCoordinator` interface

dajac opened a new pull request, #12853:
URL: https://github.com/apache/kafka/pull/12853

   This patch adds `leaveGroup` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #12853: KAFKA-14367; Add `ListGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac merged PR #12853:
URL: https://github.com/apache/kafka/pull/12853


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12853: KAFKA-14367; Add `ListGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12853:
URL: https://github.com/apache/kafka/pull/12853#discussion_r1028580865


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1618,34 +1618,36 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
     val listGroupsRequest = request.body[ListGroupsRequest]
-    val states = if (listGroupsRequest.data.statesFilter == null)
-      // Handle a null array the same as empty
-      immutable.Set[String]()
-    else
-      listGroupsRequest.data.statesFilter.asScala.toSet
 
-    def createResponse(throttleMs: Int, groups: List[GroupOverview], error: Errors): AbstractResponse = {
-       new ListGroupsResponse(new ListGroupsResponseData()
-            .setErrorCode(error.code)
-            .setGroups(groups.map { group =>
-                val listedGroup = new ListGroupsResponseData.ListedGroup()
-                  .setGroupId(group.groupId)
-                  .setProtocolType(group.protocolType)
-                  .setGroupState(group.state)
-                listedGroup
-            }.asJava)
-            .setThrottleTimeMs(throttleMs)
-        )
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {

Review Comment:
   same here with trace logging as the other PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12853: KAFKA-14367; Add `ListGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12853:
URL: https://github.com/apache/kafka/pull/12853#discussion_r1036297112


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1618,34 +1618,36 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
     val listGroupsRequest = request.body[ListGroupsRequest]
-    val states = if (listGroupsRequest.data.statesFilter == null)
-      // Handle a null array the same as empty
-      immutable.Set[String]()
-    else
-      listGroupsRequest.data.statesFilter.asScala.toSet
 
-    def createResponse(throttleMs: Int, groups: List[GroupOverview], error: Errors): AbstractResponse = {
-       new ListGroupsResponse(new ListGroupsResponseData()
-            .setErrorCode(error.code)
-            .setGroups(groups.map { group =>
-                val listedGroup = new ListGroupsResponseData.ListedGroup()
-                  .setGroupId(group.groupId)
-                  .setProtocolType(group.protocolType)
-                  .setGroupState(group.state)
-                listedGroup
-            }.asJava)
-            .setThrottleTimeMs(throttleMs)
-        )
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {

Review Comment:
   Totally. Will do when that PR is merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12853: KAFKA-14367; Add `ListGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12853:
URL: https://github.com/apache/kafka/pull/12853#discussion_r1036279657


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1618,34 +1618,36 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
     val listGroupsRequest = request.body[ListGroupsRequest]
-    val states = if (listGroupsRequest.data.statesFilter == null)
-      // Handle a null array the same as empty
-      immutable.Set[String]()
-    else
-      listGroupsRequest.data.statesFilter.asScala.toSet
 
-    def createResponse(throttleMs: Int, groups: List[GroupOverview], error: Errors): AbstractResponse = {
-       new ListGroupsResponse(new ListGroupsResponseData()
-            .setErrorCode(error.code)
-            .setGroups(groups.map { group =>
-                val listedGroup = new ListGroupsResponseData.ListedGroup()
-                  .setGroupId(group.groupId)
-                  .setProtocolType(group.protocolType)
-                  .setGroupState(group.state)
-                listedGroup
-            }.asJava)
-            .setThrottleTimeMs(throttleMs)
-        )
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {

Review Comment:
   We will probably want to use the helper from the SyncGroup PR here too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12853: KAFKA-14367; Add `ListGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12853:
URL: https://github.com/apache/kafka/pull/12853#discussion_r1036316154


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -135,4 +134,48 @@ class GroupCoordinatorAdapterTest {
     assertEquals(expectedData, future.get())
   }
 
+  @Test
+  def testListGroups(): Unit = {
+    testListGroups(null, Set.empty)
+    testListGroups(List(), Set.empty)
+    testListGroups(List("Stable"), Set("Stable"))
+  }
+
+  def testListGroups(
+    statesFilter: List[String],
+    expectedStatesFilter: Set[String]
+  ): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+    val ctx = makeContext(ApiKeys.LIST_GROUPS.latestVersion)
+    val data = new ListGroupsRequestData()
+      .setStatesFilter(statesFilter.asJava)
+
+    when(groupCoordinator.handleListGroups(expectedStatesFilter)).thenReturn {
+      (Errors.NOT_COORDINATOR, List(
+        GroupOverview("group1", "protocol1", "Stable"),
+        GroupOverview("group2", "qwerty", "Empty")

Review Comment:
   Hmm I think I'm a bit confused by your last sentence. 
   But to clarify my understanding -- this filtering is done within the handleListGroups method, but since this test is mocking the result we don't actually filter the groups? 
   
   And the different filters at the top are just really checking the empty state filter is tested correctly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12853: KAFKA-14367; Add `ListGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12853:
URL: https://github.com/apache/kafka/pull/12853#discussion_r1028583013


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1618,34 +1618,36 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
     val listGroupsRequest = request.body[ListGroupsRequest]
-    val states = if (listGroupsRequest.data.statesFilter == null)
-      // Handle a null array the same as empty
-      immutable.Set[String]()
-    else
-      listGroupsRequest.data.statesFilter.asScala.toSet
 
-    def createResponse(throttleMs: Int, groups: List[GroupOverview], error: Errors): AbstractResponse = {
-       new ListGroupsResponse(new ListGroupsResponseData()
-            .setErrorCode(error.code)
-            .setGroups(groups.map { group =>
-                val listedGroup = new ListGroupsResponseData.ListedGroup()
-                  .setGroupId(group.groupId)
-                  .setProtocolType(group.protocolType)
-                  .setGroupState(group.state)
-                listedGroup
-            }.asJava)
-            .setThrottleTimeMs(throttleMs)
-        )
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
-    val (error, groups) = groupCoordinator.handleListGroups(states)
-    if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME, logIfDenied = false))
-      // With describe cluster access all groups are returned. We keep this alternative for backward compatibility.
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        createResponse(requestThrottleMs, groups, error))
-    else {
-      val filteredGroups = groups.filter(group => authHelper.authorize(request.context, DESCRIBE, GROUP, group.groupId, logIfDenied = false))
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        createResponse(requestThrottleMs, filteredGroups, error))
+
+    val hasClusterDescribe = authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME, logIfDenied = false)
+    val ctx = makeGroupCoordinatorRequestContextFrom(request, RequestLocal.NoCaching)
+    newGroupCoordinator.listGroups(ctx, listGroupsRequest.data).handle[Unit] { (response, exception) =>
+      if (exception != null) {
+        sendResponse(listGroupsRequest.getErrorResponse(exception))
+      } else {
+        val listGroupsResponse = if (hasClusterDescribe) {
+          // With describe cluster access all groups are returned. We keep this alternative for backward compatibility.
+          new ListGroupsResponse(response)
+        } else {
+          // Otherwise, only groups with described group are returned.
+          val iterator = response.groups.iterator()

Review Comment:
   What was the reasoning for switching from the filter method to this approach? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12853: KAFKA-14367; Add `ListGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12853:
URL: https://github.com/apache/kafka/pull/12853#discussion_r1035747744


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -135,4 +134,48 @@ class GroupCoordinatorAdapterTest {
     assertEquals(expectedData, future.get())
   }
 
+  @Test
+  def testListGroups(): Unit = {
+    testListGroups(null, Set.empty)
+    testListGroups(List(), Set.empty)
+    testListGroups(List("Stable"), Set("Stable"))
+  }
+
+  def testListGroups(
+    statesFilter: List[String],
+    expectedStatesFilter: Set[String]
+  ): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+    val ctx = makeContext(ApiKeys.LIST_GROUPS.latestVersion)
+    val data = new ListGroupsRequestData()
+      .setStatesFilter(statesFilter.asJava)
+
+    when(groupCoordinator.handleListGroups(expectedStatesFilter)).thenReturn {
+      (Errors.NOT_COORDINATOR, List(
+        GroupOverview("group1", "protocol1", "Stable"),
+        GroupOverview("group2", "qwerty", "Empty")

Review Comment:
   I agree that this is a tad weird. However, the filtering is not done by `GroupCoordinatorAdapter` so the test only verifies that inputs/outputs are correctly translated here. In practice, we would not have an error and a list of groups as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12853: KAFKA-14367; Add `ListGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12853:
URL: https://github.com/apache/kafka/pull/12853#discussion_r1028596112


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -135,4 +134,48 @@ class GroupCoordinatorAdapterTest {
     assertEquals(expectedData, future.get())
   }
 
+  @Test
+  def testListGroups(): Unit = {
+    testListGroups(null, Set.empty)
+    testListGroups(List(), Set.empty)
+    testListGroups(List("Stable"), Set("Stable"))
+  }
+
+  def testListGroups(
+    statesFilter: List[String],
+    expectedStatesFilter: Set[String]
+  ): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+    val ctx = makeContext(ApiKeys.LIST_GROUPS.latestVersion)
+    val data = new ListGroupsRequestData()
+      .setStatesFilter(statesFilter.asJava)
+
+    when(groupCoordinator.handleListGroups(expectedStatesFilter)).thenReturn {
+      (Errors.NOT_COORDINATOR, List(
+        GroupOverview("group1", "protocol1", "Stable"),
+        GroupOverview("group2", "qwerty", "Empty")

Review Comment:
   Should we be including the empty group if the filter is stable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12853: KAFKA-14367; Add `ListGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12853:
URL: https://github.com/apache/kafka/pull/12853#discussion_r1035743821


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1618,34 +1618,36 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
     val listGroupsRequest = request.body[ListGroupsRequest]
-    val states = if (listGroupsRequest.data.statesFilter == null)
-      // Handle a null array the same as empty
-      immutable.Set[String]()
-    else
-      listGroupsRequest.data.statesFilter.asScala.toSet
 
-    def createResponse(throttleMs: Int, groups: List[GroupOverview], error: Errors): AbstractResponse = {
-       new ListGroupsResponse(new ListGroupsResponseData()
-            .setErrorCode(error.code)
-            .setGroups(groups.map { group =>
-                val listedGroup = new ListGroupsResponseData.ListedGroup()
-                  .setGroupId(group.groupId)
-                  .setProtocolType(group.protocolType)
-                  .setGroupState(group.state)
-                listedGroup
-            }.asJava)
-            .setThrottleTimeMs(throttleMs)
-        )
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
-    val (error, groups) = groupCoordinator.handleListGroups(states)
-    if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME, logIfDenied = false))
-      // With describe cluster access all groups are returned. We keep this alternative for backward compatibility.
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        createResponse(requestThrottleMs, groups, error))
-    else {
-      val filteredGroups = groups.filter(group => authHelper.authorize(request.context, DESCRIBE, GROUP, group.groupId, logIfDenied = false))
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        createResponse(requestThrottleMs, filteredGroups, error))
+
+    val hasClusterDescribe = authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME, logIfDenied = false)
+    val ctx = makeGroupCoordinatorRequestContextFrom(request, RequestLocal.NoCaching)
+    newGroupCoordinator.listGroups(ctx, listGroupsRequest.data).handle[Unit] { (response, exception) =>
+      if (exception != null) {
+        sendResponse(listGroupsRequest.getErrorResponse(exception))
+      } else {
+        val listGroupsResponse = if (hasClusterDescribe) {
+          // With describe cluster access all groups are returned. We keep this alternative for backward compatibility.
+          new ListGroupsResponse(response)
+        } else {
+          // Otherwise, only groups with described group are returned.
+          val iterator = response.groups.iterator()

Review Comment:
   I am actually not sure why I did that. Let me revert to the previous method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12853: KAFKA-14367; Add `ListGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12853:
URL: https://github.com/apache/kafka/pull/12853#discussion_r1036333752


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -135,4 +134,48 @@ class GroupCoordinatorAdapterTest {
     assertEquals(expectedData, future.get())
   }
 
+  @Test
+  def testListGroups(): Unit = {
+    testListGroups(null, Set.empty)
+    testListGroups(List(), Set.empty)
+    testListGroups(List("Stable"), Set("Stable"))
+  }
+
+  def testListGroups(
+    statesFilter: List[String],
+    expectedStatesFilter: Set[String]
+  ): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+    val ctx = makeContext(ApiKeys.LIST_GROUPS.latestVersion)
+    val data = new ListGroupsRequestData()
+      .setStatesFilter(statesFilter.asJava)
+
+    when(groupCoordinator.handleListGroups(expectedStatesFilter)).thenReturn {
+      (Errors.NOT_COORDINATOR, List(
+        GroupOverview("group1", "protocol1", "Stable"),
+        GroupOverview("group2", "qwerty", "Empty")

Review Comment:
   That’s right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org