You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:52:27 UTC

[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r650434257



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -559,7 +559,7 @@ object ConsumerGroupCommand extends Logging {
 
       val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
         val state = consumerGroup.state
-        val committedOffsets = getCommittedOffsets(groupId)
+        val committedOffsets = getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata])

Review comment:
       @dajac 
   That approach seem nicer, because keeps the change very narrow.
   Thanks for you comment/feedback. 
   Updated the PR. 
   
   To be completly honest I've been struggling all day with the unit test, because I am not being able to force a KafkaConsumer to start at an -1 offset to write an unit test (always starts at 0, which makes sense with OffsetAndMetada's _IllegalArgumentException("Invalid negative offset_). Dont know for what particular reason that consumer-group got on that condition:
   
   a) GroupID information (from describeConsumerGroups() method):
   (groupId=order-validations, isSimpleConsumerGroup=false, members=(memberId=order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6, groupInstanceId=null, clientId=order-validations, host=/127.0.0.1, assignment=(topicPartitions=rtl_orderReceive-0,rtl_orderReceive-1,rtl_orderReceive-2,rtl_orderReceive-3,rtl_orderReceive-4,rtl_orderReceive-5,rtl_orderReceive-6,rtl_orderReceive-7,rtl_orderReceive-8,rtl_orderReceive-9)), partitionAssignor=RoundRobinAssigner, state=Stable, coordinator=f0527.cluster.cl:31047 (id: 1 rack: null), authorizedOperations=[])
   
   b) Commited Offsets information (from getCommittedOffsets() method):
   Map(rtl_orderReceive-0 -> null, rtl_orderReceive-1 -> OffsetAndMetadata{offset=39, leaderEpoch=null, metadata=''}, rtl_orderReceive-2 -> null, rtl_orderReceive-3 -> OffsetAndMetadata{offset=33, leaderEpoch=null, metadata=''}, rtl_orderReceive-4 -> null, rtl_orderReceive-5 -> null, rtl_orderReceive-7 -> null, rtl_orderReceive-8 -> null)
   
   Whitout the change:
   ![image](https://user-images.githubusercontent.com/31544929/121788612-99c39300-cb9c-11eb-8aea-4d8ed1df97a9.png)
   
   With the change:
   ![image](https://user-images.githubusercontent.com/31544929/121788562-228dff00-cb9c-11eb-9c3f-985ef02d26e2.png)
   

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -559,7 +559,7 @@ object ConsumerGroupCommand extends Logging {
 
       val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
         val state = consumerGroup.state
-        val committedOffsets = getCommittedOffsets(groupId)
+        val committedOffsets = getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata])

Review comment:
       I was stuck with DescribeConsumerGroupTest. You are right, looking at ConsumerGroupServiceTest it should do the trick. Would try that.
   Thanks for the tip




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org