You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/15 20:05:22 UTC

[GitHub] [kafka] dajac opened a new pull request, #12858: KAFKA-14367; Add `DeleteGroups` to the new `GroupCoordinator` interface

dajac opened a new pull request, #12858:
URL: https://github.com/apache/kafka/pull/12858

   This patch adds `deleteGroups` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #12858: KAFKA-14367; Add `DeleteGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
OmniaGM commented on code in PR #12858:
URL: https://github.com/apache/kafka/pull/12858#discussion_r1030667864


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -1992,6 +1992,186 @@ class KafkaApisTest {
     testListOffsetFailedGetLeaderReplica(Errors.UNKNOWN_TOPIC_OR_PARTITION)
   }
 
+  @Test
+  def testHandleDeleteGroups(): Unit = {
+    val deleteGroupsRequest = new DeleteGroupsRequestData()
+      .setGroupsNames(List(
+        "group-1",
+        "group-2",
+        "group-3"
+      ).asJava)
+
+    val requestChannelRequest = buildRequest(new DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.DELETE_GROUPS.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
+
+    val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
+    when(newGroupCoordinator.deleteGroups(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(List("group-1", "group-2", "group-3").asJava)
+    )).thenReturn(future)
+
+    createKafkaApis().handleDeleteGroupsRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    val results = new DeleteGroupsResponseData.DeletableGroupResultCollection(List(
+      new DeleteGroupsResponseData.DeletableGroupResult()
+        .setGroupId("group-1")
+        .setErrorCode(Errors.NONE.code),
+      new DeleteGroupsResponseData.DeletableGroupResult()
+        .setGroupId("group-2")
+        .setErrorCode(Errors.NOT_CONTROLLER.code),
+      new DeleteGroupsResponseData.DeletableGroupResult()
+        .setGroupId("group-3")
+        .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code),
+    ).iterator.asJava)
+
+    future.complete(results)
+
+    val expectedDeleteGroupsResponse = new DeleteGroupsResponseData()
+      .setResults(results)
+
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[DeleteGroupsResponse]
+    assertEquals(expectedDeleteGroupsResponse, response.data)
+  }
+
+  @Test
+  def testHandleDeleteGroupsFutureFailed(): Unit = {
+    val deleteGroupsRequest = new DeleteGroupsRequestData()
+      .setGroupsNames(List(
+        "group-1",
+        "group-2",
+        "group-3"
+      ).asJava)
+
+    val requestChannelRequest = buildRequest(new DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.DELETE_GROUPS.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
+
+    val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
+    when(newGroupCoordinator.deleteGroups(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(List("group-1", "group-2", "group-3").asJava)
+    )).thenReturn(future)
+
+    createKafkaApis().handleDeleteGroupsRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    future.completeExceptionally(Errors.NOT_CONTROLLER.exception)
+
+    val expectedDeleteGroupsResponse = new DeleteGroupsResponseData()
+      .setResults(new DeleteGroupsResponseData.DeletableGroupResultCollection(List(

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #12858: KAFKA-14367; Add `DeleteGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
OmniaGM commented on code in PR #12858:
URL: https://github.com/apache/kafka/pull/12858#discussion_r1030666809


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -1992,6 +1992,186 @@ class KafkaApisTest {
     testListOffsetFailedGetLeaderReplica(Errors.UNKNOWN_TOPIC_OR_PARTITION)
   }
 
+  @Test
+  def testHandleDeleteGroups(): Unit = {
+    val deleteGroupsRequest = new DeleteGroupsRequestData()
+      .setGroupsNames(List(
+        "group-1",
+        "group-2",
+        "group-3"
+      ).asJava)
+
+    val requestChannelRequest = buildRequest(new DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.DELETE_GROUPS.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
+
+    val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
+    when(newGroupCoordinator.deleteGroups(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(List("group-1", "group-2", "group-3").asJava)
+    )).thenReturn(future)
+
+    createKafkaApis().handleDeleteGroupsRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    val results = new DeleteGroupsResponseData.DeletableGroupResultCollection(List(

Review Comment:
   One small suggestion here. It seems like we initialize `List[DeletableGroupResult]` for these test cases here so maybe to simplify the test code a bit we can have a method that converts `Map[String, Errors]` (where group id is the key) into `List[DeletableGroupResult]`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #12858: KAFKA-14367; Add `DeleteGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac merged PR #12858:
URL: https://github.com/apache/kafka/pull/12858


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #12858: KAFKA-14367; Add `DeleteGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
OmniaGM commented on code in PR #12858:
URL: https://github.com/apache/kafka/pull/12858#discussion_r1030668387


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -1992,6 +1992,186 @@ class KafkaApisTest {
     testListOffsetFailedGetLeaderReplica(Errors.UNKNOWN_TOPIC_OR_PARTITION)
   }
 
+  @Test
+  def testHandleDeleteGroups(): Unit = {
+    val deleteGroupsRequest = new DeleteGroupsRequestData()
+      .setGroupsNames(List(
+        "group-1",
+        "group-2",
+        "group-3"
+      ).asJava)
+
+    val requestChannelRequest = buildRequest(new DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.DELETE_GROUPS.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
+
+    val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
+    when(newGroupCoordinator.deleteGroups(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(List("group-1", "group-2", "group-3").asJava)
+    )).thenReturn(future)
+
+    createKafkaApis().handleDeleteGroupsRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    val results = new DeleteGroupsResponseData.DeletableGroupResultCollection(List(
+      new DeleteGroupsResponseData.DeletableGroupResult()
+        .setGroupId("group-1")
+        .setErrorCode(Errors.NONE.code),
+      new DeleteGroupsResponseData.DeletableGroupResult()
+        .setGroupId("group-2")
+        .setErrorCode(Errors.NOT_CONTROLLER.code),
+      new DeleteGroupsResponseData.DeletableGroupResult()
+        .setGroupId("group-3")
+        .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code),
+    ).iterator.asJava)
+
+    future.complete(results)
+
+    val expectedDeleteGroupsResponse = new DeleteGroupsResponseData()
+      .setResults(results)
+
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[DeleteGroupsResponse]
+    assertEquals(expectedDeleteGroupsResponse, response.data)
+  }
+
+  @Test
+  def testHandleDeleteGroupsFutureFailed(): Unit = {
+    val deleteGroupsRequest = new DeleteGroupsRequestData()
+      .setGroupsNames(List(
+        "group-1",
+        "group-2",
+        "group-3"
+      ).asJava)
+
+    val requestChannelRequest = buildRequest(new DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.DELETE_GROUPS.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
+
+    val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
+    when(newGroupCoordinator.deleteGroups(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(List("group-1", "group-2", "group-3").asJava)
+    )).thenReturn(future)
+
+    createKafkaApis().handleDeleteGroupsRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    future.completeExceptionally(Errors.NOT_CONTROLLER.exception)
+
+    val expectedDeleteGroupsResponse = new DeleteGroupsResponseData()
+      .setResults(new DeleteGroupsResponseData.DeletableGroupResultCollection(List(
+        new DeleteGroupsResponseData.DeletableGroupResult()
+          .setGroupId("group-1")
+          .setErrorCode(Errors.NOT_CONTROLLER.code),
+        new DeleteGroupsResponseData.DeletableGroupResult()
+          .setGroupId("group-2")
+          .setErrorCode(Errors.NOT_CONTROLLER.code),
+        new DeleteGroupsResponseData.DeletableGroupResult()
+          .setGroupId("group-3")
+          .setErrorCode(Errors.NOT_CONTROLLER.code),
+      ).iterator.asJava))
+
+    val capturedResponse = verifyNoThrottling(requestChannelRequest)
+    val response = capturedResponse.getValue.asInstanceOf[DeleteGroupsResponse]
+    assertEquals(expectedDeleteGroupsResponse, response.data)
+  }
+
+  @Test
+  def testHandleDeleteGroupsAuthenticationFailed(): Unit = {
+    val deleteGroupsRequest = new DeleteGroupsRequestData()
+      .setGroupsNames(List(
+        "group-1",
+        "group-2",
+        "group-3"
+      ).asJava)
+
+    val requestChannelRequest = buildRequest(new DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+
+    def makeDeleteAction(groupId: String): Action = {
+      new Action(
+        AclOperation.DELETE,
+        new ResourcePattern(ResourceType.GROUP, groupId, PatternType.LITERAL),
+        1,
+        true,
+        true
+      )
+    }
+
+    when(authorizer.authorize(
+      any[RequestContext],
+      ArgumentMatchers.eq(Seq(
+        makeDeleteAction("group-3"),
+        makeDeleteAction("group-2"),
+        makeDeleteAction("group-1"),
+      ).asJava)
+    )).thenReturn(Seq(
+      AuthorizationResult.ALLOWED,
+      AuthorizationResult.ALLOWED,
+      AuthorizationResult.DENIED
+    ).asJava)
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.DELETE_GROUPS.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
+
+    val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
+    when(newGroupCoordinator.deleteGroups(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(List("group-2", "group-3").asJava)
+    )).thenReturn(future)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleDeleteGroupsRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    future.complete(new DeleteGroupsResponseData.DeletableGroupResultCollection(List(
+      new DeleteGroupsResponseData.DeletableGroupResult()
+        .setGroupId("group-2")
+        .setErrorCode(Errors.NONE.code),
+      new DeleteGroupsResponseData.DeletableGroupResult()
+        .setGroupId("group-3")
+        .setErrorCode(Errors.NONE.code)
+    ).iterator.asJava))
+
+    val expectedDeleteGroupsResponse = new DeleteGroupsResponseData()
+      .setResults(new DeleteGroupsResponseData.DeletableGroupResultCollection(List(
+        new DeleteGroupsResponseData.DeletableGroupResult()

Review Comment:
   same here as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12858: KAFKA-14367; Add `DeleteGroups` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12858:
URL: https://github.com/apache/kafka/pull/12858#discussion_r1045524817


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -1992,6 +1992,186 @@ class KafkaApisTest {
     testListOffsetFailedGetLeaderReplica(Errors.UNKNOWN_TOPIC_OR_PARTITION)
   }
 
+  @Test
+  def testHandleDeleteGroups(): Unit = {
+    val deleteGroupsRequest = new DeleteGroupsRequestData()
+      .setGroupsNames(List(
+        "group-1",
+        "group-2",
+        "group-3"
+      ).asJava)
+
+    val requestChannelRequest = buildRequest(new DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
+
+    val expectedRequestContext = new GroupCoordinatorRequestContext(
+      ApiKeys.DELETE_GROUPS.latestVersion,
+      requestChannelRequest.context.clientId,
+      requestChannelRequest.context.clientAddress,
+      RequestLocal.NoCaching.bufferSupplier
+    )
+
+    val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
+    when(newGroupCoordinator.deleteGroups(
+      ArgumentMatchers.eq(expectedRequestContext),
+      ArgumentMatchers.eq(List("group-1", "group-2", "group-3").asJava)
+    )).thenReturn(future)
+
+    createKafkaApis().handleDeleteGroupsRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    val results = new DeleteGroupsResponseData.DeletableGroupResultCollection(List(

Review Comment:
   Thanks for your comment. I played a bit with your suggestion and I found that using a `Map` is not convenient here because the order matters. We could use an `LinkedHashMap` but that makes things less readable in my opinion. I think that I will stick to the current approach which is not that bad. We do this everywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org