You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/14 14:44:15 UTC

[GitHub] [kafka] VinceMu opened a new pull request #8666: KAFKA-9479 Describe consumer group --state --all-groups show header once

VinceMu opened a new pull request #8666:
URL: https://github.com/apache/kafka/pull/8666


   Used the  [previous PR ](https://github.com/apache/kafka/pull/8096) made by vetler as a starting point.
   
   Updated the  printState() method in ConsumerGroup command to only print the header once when the following options are set `--describe --state --all-groups`.
   
   Modified testDescribeAllExistingGroups test in DescribeConsumerGroupTest so that we take into account the case where we only print the header once. In this case number of lines is equal to the length of DescribeTypes + 1. 
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] VinceMu commented on a change in pull request #8666: KAFKA-9479: Describe consumer group --state --all-groups show header once

Posted by GitBox <gi...@apache.org>.
VinceMu commented on a change in pull request #8666:
URL: https://github.com/apache/kafka/pull/8666#discussion_r435218619



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def printStates(states: Map[String, GroupState]): Unit = {
-      for ((groupId, state) <- states) {
-        if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+      val stateProps =  states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, Some(state.state), Some(1))}
+        .map{case (_,state)=>
           val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"
-          val coordinatorColLen = Math.max(25, coordinator.length)
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format(state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers))
-          println()
+          (state.group,coordinator,state.assignmentStrategy,state.state,state.numMembers)
         }
+      val hasAllGroups = opts.options.has(opts.allGroupsOpt)
+      if(stateProps.nonEmpty && hasAllGroups){
+        val headerLengthOffset = Math.max(25,stateProps.maxBy{_._2.length}._2.length)
+        print(s"\n%${-headerLengthOffset}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))

Review comment:
       `Print`s with explicit '\n' are used throughout ConsumerGroupCommand. My guess as to why this is because `println` uses `System.lineSeparator` which may not always be '\n' depending on the operating system. Switching to `println` would also involve modifying the tests which count lines by splitting on `\n'. Because of these reasons I'm hesitant to switch over to `println`. Thoughts?
   
   The idea of using the coordinator length as an offset to format the length existed before this PR. The change I've now implemented is instead of calculating offset length per group, we calculate it only once by taking the largest possible offset from all groups. 

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def printStates(states: Map[String, GroupState]): Unit = {
-      for ((groupId, state) <- states) {
-        if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+      val stateProps =  states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, Some(state.state), Some(1))}
+        .map{case (_,state)=>
           val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"
-          val coordinatorColLen = Math.max(25, coordinator.length)
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format(state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers))
-          println()
+          (state.group,coordinator,state.assignmentStrategy,state.state,state.numMembers)
         }
+      val hasAllGroups = opts.options.has(opts.allGroupsOpt)
+      if(stateProps.nonEmpty && hasAllGroups){
+        val headerLengthOffset = Math.max(25,stateProps.maxBy{_._2.length}._2.length)
+        print(s"\n%${-headerLengthOffset}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))

Review comment:
       `Print`s with explicit '\n' are used throughout ConsumerGroupCommand. My guess as to why this is because `println` uses `System.lineSeparator` which may not always be '\n' depending on the operating system. Switching to `println` would also involve modifying the tests which count lines by splitting on '\n'. Because of these reasons I'm hesitant to switch over to `println`. Thoughts?
   
   The idea of using the coordinator length as an offset to format the length existed before this PR. The change I've now implemented is instead of calculating offset length per group, we calculate it only once by taking the largest possible offset from all groups. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] VinceMu commented on a change in pull request #8666: KAFKA-9479: Describe consumer group --state --all-groups show header once

Posted by GitBox <gi...@apache.org>.
VinceMu commented on a change in pull request #8666:
URL: https://github.com/apache/kafka/pull/8666#discussion_r435213295



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def printStates(states: Map[String, GroupState]): Unit = {
-      for ((groupId, state) <- states) {
-        if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+      val stateProps =  states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, Some(state.state), Some(1))}
+        .map{case (_,state)=>
           val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"
-          val coordinatorColLen = Math.max(25, coordinator.length)
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format(state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers))
-          println()
+          (state.group,coordinator,state.assignmentStrategy,state.state,state.numMembers)
         }
+      val hasAllGroups = opts.options.has(opts.allGroupsOpt)
+      if(stateProps.nonEmpty && hasAllGroups){
+        val headerLengthOffset = Math.max(25,stateProps.maxBy{_._2.length}._2.length)
+        print(s"\n%${-headerLengthOffset}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
+      }
+      stateProps.foreach{ case(group,coordinator,assignmentStrategy,state,numMembers)=>
+        val offset = -Math.max(25,coordinator.length)

Review comment:
       Done this :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8666: KAFKA-9479: Describe consumer group --state --all-groups show header once

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8666:
URL: https://github.com/apache/kafka/pull/8666#discussion_r434554562



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def printStates(states: Map[String, GroupState]): Unit = {
-      for ((groupId, state) <- states) {
-        if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+      val stateProps =  states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, Some(state.state), Some(1))}
+        .map{case (_,state)=>
           val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"

Review comment:
       I suggest to extract this as a method `coordinatorString` in the `GroupState` case class or in an inner function.

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def printStates(states: Map[String, GroupState]): Unit = {
-      for ((groupId, state) <- states) {
-        if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+      val stateProps =  states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, Some(state.state), Some(1))}
+        .map{case (_,state)=>
           val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"
-          val coordinatorColLen = Math.max(25, coordinator.length)
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format(state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers))
-          println()
+          (state.group,coordinator,state.assignmentStrategy,state.state,state.numMembers)
         }
+      val hasAllGroups = opts.options.has(opts.allGroupsOpt)
+      if(stateProps.nonEmpty && hasAllGroups){
+        val headerLengthOffset = Math.max(25,stateProps.maxBy{_._2.length}._2.length)
+        print(s"\n%${-headerLengthOffset}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
+      }
+      stateProps.foreach{ case(group,coordinator,assignmentStrategy,state,numMembers)=>
+        val offset = -Math.max(25,coordinator.length)

Review comment:
       The computation of the offset is not consistent here. For all groups, it will result in misaligning the group information. We should compute it once and reuse it all the time.

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def printStates(states: Map[String, GroupState]): Unit = {
-      for ((groupId, state) <- states) {
-        if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+      val stateProps =  states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, Some(state.state), Some(1))}
+        .map{case (_,state)=>
           val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"
-          val coordinatorColLen = Math.max(25, coordinator.length)
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format(state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers))
-          println()
+          (state.group,coordinator,state.assignmentStrategy,state.state,state.numMembers)
         }
+      val hasAllGroups = opts.options.has(opts.allGroupsOpt)
+      if(stateProps.nonEmpty && hasAllGroups){

Review comment:
       What's the reason why you distinguish the all groups case here? It seems that it was not case before.

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def printStates(states: Map[String, GroupState]): Unit = {
-      for ((groupId, state) <- states) {
-        if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+      val stateProps =  states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, Some(state.state), Some(1))}
+        .map{case (_,state)=>
           val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"
-          val coordinatorColLen = Math.max(25, coordinator.length)
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format(state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers))
-          println()
+          (state.group,coordinator,state.assignmentStrategy,state.state,state.numMembers)
         }
+      val hasAllGroups = opts.options.has(opts.allGroupsOpt)
+      if(stateProps.nonEmpty && hasAllGroups){
+        val headerLengthOffset = Math.max(25,stateProps.maxBy{_._2.length}._2.length)
+        print(s"\n%${-headerLengthOffset}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))

Review comment:
       We could use `println` and get ride of the `\n` here. Also, it seems that `headerLengthOffset` is computed based on the coordinator string but is used to format the group.

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def printStates(states: Map[String, GroupState]): Unit = {
-      for ((groupId, state) <- states) {
-        if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+      val stateProps =  states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, Some(state.state), Some(1))}

Review comment:
       I wonder if we shouldn't simply get rid of `shouldPrintMemberState` and print out all the groups in the table. `shouldPrintMemberState` seems to log some info based on the state of the group. I think that the state would be self explanatory in the table. Would this make sense?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] VinceMu commented on pull request #8666: KAFKA-9479: Describe consumer group --state --all-groups show header once

Posted by GitBox <gi...@apache.org>.
VinceMu commented on pull request #8666:
URL: https://github.com/apache/kafka/pull/8666#issuecomment-646962690


   @dajac just following up on this PR, I've addressed your feedback. Ready for a re-review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] VinceMu commented on a change in pull request #8666: KAFKA-9479: Describe consumer group --state --all-groups show header once

Posted by GitBox <gi...@apache.org>.
VinceMu commented on a change in pull request #8666:
URL: https://github.com/apache/kafka/pull/8666#discussion_r435211667



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def printStates(states: Map[String, GroupState]): Unit = {
-      for ((groupId, state) <- states) {
-        if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+      val stateProps =  states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, Some(state.state), Some(1))}

Review comment:
       I wanted to avoid changing any existing behaviour with this PR. In light of this, could we print all the groups regardless of their state when the `--all-groups` flag is given? 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] VinceMu commented on a change in pull request #8666: KAFKA-9479: Describe consumer group --state --all-groups show header once

Posted by GitBox <gi...@apache.org>.
VinceMu commented on a change in pull request #8666:
URL: https://github.com/apache/kafka/pull/8666#discussion_r435212607



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def printStates(states: Map[String, GroupState]): Unit = {
-      for ((groupId, state) <- states) {
-        if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+      val stateProps =  states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, Some(state.state), Some(1))}
+        .map{case (_,state)=>
           val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"
-          val coordinatorColLen = Math.max(25, coordinator.length)
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format(state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers))
-          println()
+          (state.group,coordinator,state.assignmentStrategy,state.state,state.numMembers)
         }
+      val hasAllGroups = opts.options.has(opts.allGroupsOpt)
+      if(stateProps.nonEmpty && hasAllGroups){

Review comment:
       KAFKA-9479 entails only printing the header once when the `all-groups` flag is present. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] VinceMu commented on a change in pull request #8666: KAFKA-9479: Describe consumer group --state --all-groups show header once

Posted by GitBox <gi...@apache.org>.
VinceMu commented on a change in pull request #8666:
URL: https://github.com/apache/kafka/pull/8666#discussion_r435213295



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def printStates(states: Map[String, GroupState]): Unit = {
-      for ((groupId, state) <- states) {
-        if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+      val stateProps =  states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, Some(state.state), Some(1))}
+        .map{case (_,state)=>
           val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"
-          val coordinatorColLen = Math.max(25, coordinator.length)
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
-          print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format(state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers))
-          println()
+          (state.group,coordinator,state.assignmentStrategy,state.state,state.numMembers)
         }
+      val hasAllGroups = opts.options.has(opts.allGroupsOpt)
+      if(stateProps.nonEmpty && hasAllGroups){
+        val headerLengthOffset = Math.max(25,stateProps.maxBy{_._2.length}._2.length)
+        print(s"\n%${-headerLengthOffset}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
+      }
+      stateProps.foreach{ case(group,coordinator,assignmentStrategy,state,numMembers)=>
+        val offset = -Math.max(25,coordinator.length)

Review comment:
       Thanks for that find, I've now implemented offset computation only once. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] VinceMu commented on a change in pull request #8666: KAFKA-9479: Describe consumer group --state --all-groups show header once

Posted by GitBox <gi...@apache.org>.
VinceMu commented on a change in pull request #8666:
URL: https://github.com/apache/kafka/pull/8666#discussion_r435211987



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
     }
 
     private def printStates(states: Map[String, GroupState]): Unit = {
-      for ((groupId, state) <- states) {
-        if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+      val stateProps =  states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, Some(state.state), Some(1))}
+        .map{case (_,state)=>
           val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"

Review comment:
       Thanks for the suggestion, I've extracted this into the `GroupState` case class. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org