You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/21 14:16:29 UTC

[GitHub] [kafka] dajac commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state

dajac commented on a change in pull request #8238:
URL: https://github.com/apache/kafka/pull/8238#discussion_r412182105



##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -1553,6 +1554,50 @@ class KafkaApisTest {
     assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testListGroupsRequest(): Unit = {
+    val overviews = List(
+      GroupOverview("group1", "protocol1", "Stable"),
+      GroupOverview("goupp2", "qwerty", "Empty")
+    )
+    val response = listGroupRequest(Option.empty, overviews)
+    assertEquals(2, response.data.groups.size)
+    assertEquals("", response.data.groups.get(0).groupState)
+    assertEquals("", response.data.groups.get(1).groupState)
+  }
+
+  @Test
+  def testListGroupsRequestWithState(): Unit = {
+    val overviews = List(
+      GroupOverview("group1", "protocol1", "Stable")
+    )
+    val response = listGroupRequest(Option.apply("Stable"), overviews)
+    assertEquals(1, response.data.groups.size)
+    assertEquals("Stable", response.data.groups.get(0).groupState)
+  }
+
+  private def listGroupRequest(state: Option[String], overviews: List[GroupOverview]): ListGroupsResponse = {
+    EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
+
+    val data = new ListGroupsRequestData()
+    if (state.isDefined)
+      data.setStates(Collections.singletonList(state.get))
+    val listGroupsRequest = new ListGroupsRequest.Builder(data).build()
+    val requestChannelRequest = buildRequest(listGroupsRequest)
+
+    val capturedResponse = expectNoThrottling()
+    val expectedStates = if (state.isDefined) List(state.get) else List()
+    EasyMock.expect(groupCoordinator.handleListGroups(expectedStates))
+      .andReturn((Errors.NONE, overviews))
+    EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel)
+
+    createKafkaApis().handleListGroupsRequest(requestChannelRequest)
+
+    val response = readResponse(ApiKeys.LIST_GROUPS, listGroupsRequest, capturedResponse).asInstanceOf[ListGroupsResponse]
+    assertEquals(Errors.NONE.code, response.data.errorCode)
+    return response

Review comment:
       nit: `return` can be omitted here.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -1553,6 +1554,50 @@ class KafkaApisTest {
     assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testListGroupsRequest(): Unit = {
+    val overviews = List(
+      GroupOverview("group1", "protocol1", "Stable"),
+      GroupOverview("goupp2", "qwerty", "Empty")
+    )
+    val response = listGroupRequest(Option.empty, overviews)

Review comment:
       nit: We tend to use `None` instead of `Option.empty`.

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -1024,6 +1071,11 @@ object ConsumerGroupCommand extends Logging {
         if (!options.has(groupOpt) && !options.has(allGroupsOpt))
           CommandLineUtils.printUsageAndDie(parser,
             s"Option $describeOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}")
+        val mutuallyExclusiveOpts: Set[OptionSpec[_]] = Set(membersOpt, offsetsOpt, stateOpt)
+        if (mutuallyExclusiveOpts.toList.map(o => if (options.has(o)) 1 else 0).sum > 1) {
+          CommandLineUtils.printUsageAndDie(parser,
+            s"Option $describeOpt takes at most one of these options: $mutuallyExclusiveOpts")

Review comment:
        We should build a string here: `mutuallyExclusiveOpts..mkString(", ")`

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -1553,6 +1554,50 @@ class KafkaApisTest {
     assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testListGroupsRequest(): Unit = {
+    val overviews = List(
+      GroupOverview("group1", "protocol1", "Stable"),
+      GroupOverview("goupp2", "qwerty", "Empty")
+    )
+    val response = listGroupRequest(Option.empty, overviews)
+    assertEquals(2, response.data.groups.size)
+    assertEquals("", response.data.groups.get(0).groupState)
+    assertEquals("", response.data.groups.get(1).groupState)

Review comment:
       When I see this, I do wonder if it wouldn't be better to make the `GroupState` field in the response `nullable` and to set it to `null` when the state is not provided. Having to handle empty stings is a bit annoying. What do you think?

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -1553,6 +1554,50 @@ class KafkaApisTest {
     assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testListGroupsRequest(): Unit = {
+    val overviews = List(
+      GroupOverview("group1", "protocol1", "Stable"),
+      GroupOverview("goupp2", "qwerty", "Empty")
+    )
+    val response = listGroupRequest(Option.empty, overviews)
+    assertEquals(2, response.data.groups.size)
+    assertEquals("", response.data.groups.get(0).groupState)
+    assertEquals("", response.data.groups.get(1).groupState)
+  }
+
+  @Test
+  def testListGroupsRequestWithState(): Unit = {
+    val overviews = List(
+      GroupOverview("group1", "protocol1", "Stable")
+    )
+    val response = listGroupRequest(Option.apply("Stable"), overviews)

Review comment:
       nit: We tend to use `Some("Stable")` instead of `Option.apply`.

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -84,6 +87,15 @@ object ConsumerGroupCommand extends Logging {
     }
   }
 
+  def consumerGroupStatesFromString(input: String): List[ConsumerGroupState] = {
+    val parsedStates = input.split(',').map(s => ConsumerGroupState.parse(s.trim.toLowerCase.capitalize)).toSet.toList
+    if (parsedStates.contains(ConsumerGroupState.UNKNOWN)) {
+      val validStates = ConsumerGroupState.values().filter(_ != ConsumerGroupState.UNKNOWN)
+      throw new IllegalArgumentException(s"Invalid state list '$input'. Valid states are: ${validStates.mkString(",")}")
+    }
+    return parsedStates

Review comment:
       nit: `return` can be omitted here.

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -84,6 +87,15 @@ object ConsumerGroupCommand extends Logging {
     }
   }
 
+  def consumerGroupStatesFromString(input: String): List[ConsumerGroupState] = {
+    val parsedStates = input.split(',').map(s => ConsumerGroupState.parse(s.trim.toLowerCase.capitalize)).toSet.toList
+    if (parsedStates.contains(ConsumerGroupState.UNKNOWN)) {
+      val validStates = ConsumerGroupState.values().filter(_ != ConsumerGroupState.UNKNOWN)
+      throw new IllegalArgumentException(s"Invalid state list '$input'. Valid states are: ${validStates.mkString(",")}")

Review comment:
       nit: add a space after `,`

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -178,12 +190,44 @@ object ConsumerGroupCommand extends Logging {
       } else None
     }
 
-    def listGroups(): List[String] = {
+    def listGroups(): Unit = {
+      if (opts.options.has(opts.stateOpt)) {
+           val stateValue = opts.options.valueOf(opts.stateOpt)
+           val states = if (stateValue == null || stateValue.isEmpty)
+             allStates
+           else
+             consumerGroupStatesFromString(stateValue)
+           val listings = listConsumerGroupsWithState(states)
+           printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)).toList)

Review comment:
       nit: I suppose that the last conversion `.toList` is not necessary.

##########
File path: core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
##########
@@ -52,11 +51,25 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     }
   }
 
-  @Test(expected = classOf[OptionException])
+  @Test
   def testDescribeWithMultipleSubActions(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+    var exitStatus: Option[Int] = None
+    var exitMessage: Option[String] = None
+    Exit.setExitProcedure { (status, err) =>
+      exitStatus = Some(status)
+      exitMessage = err
+      throw new RuntimeException
+    }
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--members", "--state")
-    getConsumerGroupService(cgcArgs)
+    try {
+      ConsumerGroupCommand.main(cgcArgs)
+    } catch {
+      case e: RuntimeException => //expected
+    } finally {
+      Exit.resetExitProcedure()
+    }
+    assertEquals(Some(1), exitStatus)
+    assertTrue(exitMessage.get.contains("Option [describe] takes at most one of these options"))

Review comment:
       I didn't realize that `checkArgs` was exiting when an error occurs. The way we use `printUsageAndDie` everywhere is a bit annoying and makes testing hard. I think that we should throw proper exception and catch them in the main function instead. I suppose that this is something for another PR though...

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -84,6 +87,15 @@ object ConsumerGroupCommand extends Logging {
     }
   }
 
+  def consumerGroupStatesFromString(input: String): List[ConsumerGroupState] = {
+    val parsedStates = input.split(',').map(s => ConsumerGroupState.parse(s.trim.toLowerCase.capitalize)).toSet.toList
+    if (parsedStates.contains(ConsumerGroupState.UNKNOWN)) {
+      val validStates = ConsumerGroupState.values().filter(_ != ConsumerGroupState.UNKNOWN)
+      throw new IllegalArgumentException(s"Invalid state list '$input'. Valid states are: ${validStates.mkString(",")}")

Review comment:
       I tried to run the tool to see how this behave. It is a bit annoying that we are not consistent with the other command line argument errors.
   
   It looks like this:
   ```
   $ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --state aa
   
   Error: Executing consumer group command failed due to Invalid state list 'aa'. Valid states are: PreparingRebalance,CompletingRebalance,Stable,Dead,Empty
   java.lang.IllegalArgumentException: Invalid state list 'aa'. Valid states are: PreparingRebalance,CompletingRebalance,Stable,Dead,Empty
   	at kafka.admin.ConsumerGroupCommand$.consumerGroupStatesFromString(ConsumerGroupCommand.scala:94)
   	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:199)
   	at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:66)
   	at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)`
   ```
   
   I have also noticed that we don't handle exceptions thrown when a argument is not recognized:
   ```
   $ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --stat
   
   Exception in thread "main" joptsimple.UnrecognizedOptionException: stat is not a recognized option
   	at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
   	at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
   	at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
   	at joptsimple.OptionParser.parse(OptionParser.java:396)
   	at kafka.admin.ConsumerGroupCommand$ConsumerGroupCommandOptions.<init>(ConsumerGroupCommand.scala:1058)
   	at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:51)
   	at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
   ```
   
   Whereas, when we do handle things, we print an error and print out the help:
   ```
   $ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group bla --state --members
   
   Option [describe] takes at most one of these options: Set([members], [offsets], [state])
   Option                                  Description
   ------                                  -----------
   --all-groups                            Apply to all consumer groups.
   --all-topics                            Consider all topics assigned to a
                                             group in the `reset-offsets` process.
   --bootstrap-server <String: server to   REQUIRED: The server(s) to connect to.
     connect to>
   --by-duration <String: duration>        Reset offsets to offset by duration
                                             from current timestamp. Format:
                                             'PnDTnHnMnS'
   ...
   ```
   
   I wonder if we could catch `OptionException` and `IllegalArgumentException` in the main to print out better error message for the end user.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org