You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ignacio Acuna (Jira)" <ji...@apache.org> on 2021/06/10 00:16:00 UTC

[jira] [Created] (KAFKA-12926) ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Ignacio Acuna created KAFKA-12926:
-------------------------------------

             Summary: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh
                 Key: KAFKA-12926
                 URL: https://issues.apache.org/jira/browse/KAFKA-12926
             Project: Kafka
          Issue Type: Bug
          Components: admin, clients
            Reporter: Ignacio Acuna
            Assignee: Ignacio Acuna


Hi everybody, hope everyone is doing great.

*i) Introduction:*
I noticed the following exception (on a cluster with brokers running 2.3.1) when trying to describe a consumer group (using the Kafka 2.7.1):

 
{code:java}
./kafka-consumer-groups.sh --describe --group order-validations{code}
{code:java}
Error: Executing consumer group command failed due to null
java.lang.NullPointerException
 at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$6(ConsumerGroupCommand.scala:579)
 at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99)
 at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86)
 at scala.collection.convert.JavaCollectionWrappers$JSetWrapper.map(JavaCollectionWrappers.scala:180)
 at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$5(ConsumerGroupCommand.scala:578)
 at scala.collection.immutable.List.flatMap(List.scala:293)
 at scala.collection.immutable.List.flatMap(List.scala:79)
 at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:574)
 at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
 at scala.collection.mutable.Growable.addAll(Growable.scala:62)
 at scala.collection.mutable.Growable.addAll$(Growable.scala:59)
 at scala.collection.mutable.HashMap.addAll(HashMap.scala:117)
 at scala.collection.mutable.HashMap$.from(HashMap.scala:570)
 at scala.collection.mutable.HashMap$.from(HashMap.scala:563)
 at scala.collection.MapOps$WithFilter.map(Map.scala:358)
 at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:569)
 at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:369)
 at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:76)
 at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
 at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala){code}
 

When trying on and older version of AdminClient (2.3.1):
{code:java}
Error: Executing consumer group command failed due to java.lang.IllegalArgumentException: Invalid negative offset
java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Invalid negative offset
 at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
 at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
 at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
 at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
 at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.getCommittedOffsets(ConsumerGroupCommand.scala:595)
 at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:421)
 at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$Lambda$131/000000004CB1EFD0.apply(Unknown Source)
 at scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:827)
 at scala.collection.TraversableLike$WithFilter$$Lambda$132/000000004CD49E20.apply(Unknown Source)
 at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
 at scala.collection.mutable.HashMap$$Lambda$133/000000004CD4A4F0.apply(Unknown Source)
 at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
 at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
 at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:826)
 at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:419)
 at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:312)
 at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
 at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: java.lang.IllegalArgumentException: Invalid negative offset
 at org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:50)
 at org.apache.kafka.clients.admin.KafkaAdminClient$24$1.handleResponse(KafkaAdminClient.java:2832)
 at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1032)
 at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1160)
 at java.lang.Thread.run(Thread.java:820){code}
 

The main difference between those outputs is what had been done in KAFKA-9507.

*ii) Problem:* 
commitedOffsets for some partitions are arriving as null to _ConsumerGroupCommand_. Then (for the assigned consumers to those such null OffsetAndMetadata's partitions) getting the offset's value throws an java.lang.NullPointerException, because the ConsumerGroupCommand tries to map over a null value.

*iii) Example*:
a) +GroupID information (from describeConsumerGroups() method):+
(groupId=order-validations, isSimpleConsumerGroup=false, members=(memberId=order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6, groupInstanceId=null, clientId=order-validations, host=/127.0.0.1, assignment=(topicPartitions=rtl_orderReceive-0,rtl_orderReceive-1,rtl_orderReceive-2,rtl_orderReceive-3,rtl_orderReceive-4,rtl_orderReceive-5,rtl_orderReceive-6,rtl_orderReceive-7,rtl_orderReceive-8,rtl_orderReceive-9)), partitionAssignor=RoundRobinAssigner, state=Stable, coordinator=f0527.cluster.cl:31047 (id: 1 rack: null), authorizedOperations=[])

b) +Commited Offsets information (from getCommittedOffsets() method):+
Map(rtl_orderReceive-0 -> null, rtl_orderReceive-1 -> OffsetAndMetadata\{offset=39, leaderEpoch=null, metadata=''}, rtl_orderReceive-2 -> null, rtl_orderReceive-3 -> OffsetAndMetadata\{offset=33, leaderEpoch=null, metadata=''}, rtl_orderReceive-4 -> null, rtl_orderReceive-5 -> null, rtl_orderReceive-7 -> null, rtl_orderReceive-8 -> null)

As seen, member order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6 is assigned to all partitions, but the commited offsets reported for the the partition 0,2,4,5,7,8 are null.
Then getting commited offsets for rtl_orderReceive-0 throws an error at .map(_.offset), because it translates to null.map(_.offset). This is happening because the offset of that partions is -1 and that gets map to null (as defined on KAFKA-9507).

*iv) Proposals:*

a) +Fix locally on the ConsumerGroupCommand:+
Add a filter to the Commited Offsets arriving from upstreams to catch border cases. In that way, even if upstreams cames with null values instead of a OffsetAndMetadata, the describeGroups would work and get the consumer group description.

From
{code:java}
val committedOffsets = getCommittedOffsets(groupId){code}
To:
{code:java}
val committedOffsets = getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata]){code}
 


b) +Fix upstreams on KafkaAdmin's listConsumerGroupOffsets method:+
Related to KAFKA-9507. In that issue, the solution to handle negative offsets was to explicitly set null to the topicPartition:
{code:java}
if (offset < 0) {
 groupOffsetsListing.put(topicPartition, null);
 } else {
 groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
 }{code}
 

That approach solves org.apache.kafka.clients.consumer.OffsetAndMetadata for throwing an _'Invalid negative offset'_ error, but affects downstreams methods that use KafkaAdminClient's listConsumerGroupOffsets method (as the one at kafka-consumer-groups.sh).

The proposal is to skip returning offset for topic partitions where offsets are negative:
{code:java}
if (offset >= 0) {
 groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
 }{code}
 

This would remove the negative offsets from the the listConsumerGroupOffsets and guarantee that the results are valids OffsetAndMetadata (not only handling negative offsets as KAFKA-9507, but not impacting other downstreams methods which expects an OffsetAndMetadata instead of a null value).

I think the second approach is cleaner because let the downstreams methods without having to handle the null's border case, which may lead to expecions (as seen).

I had been working on the both approaches, and I ready to prepare a PR. What do you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)