You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/10 01:15:18 UTC

[GitHub] [kafka] IgnacioAcunaF opened a new pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

IgnacioAcunaF opened a new pull request #10858:
URL: https://github.com/apache/kafka/pull/10858


   **Jira:**: https://issues.apache.org/jira/browse/KAFKA-12926
   
   Instead of setting "null" to negative offsets partition (as in KAFKA-9507), this PR aims to skip those cases from the returned list, because setting them in "null" can cause java.lang.NullPointerExceptions on downstreams methods that tries to access the data on them, because they are expecting an _OffsetAndMetadata_ and they encouter null values.
   
   For example, at ConsumerGroupCommand.scala at core:
   ```
     val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
               .map { topicPartition =>
                 topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
               }.toMap
   ```
   If topicPartition has an negative offset, the committedOffsets.get(topicPartition) is null (as defined on KAFKA-9507), which translates into null.map(_.offset), which will lead to: _Error: Executing consumer group command failed due to null
   java.lang.NullPointerException_
   
   Unit tests added to assert that topics's partitions with an INVALID_OFFSET are not considered on the returned list of the consmer groups's offsets, so the downstream methods receive only valid _OffsetAndMetadata_ information.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-858703098


   PING @hachikuji @apovzner as I saw you on [KAFKA-9507](https://github.com/apache/kafka/pull/8057)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-862046292


   @dajac Thanks again for comments and support! :)
   Updated the PR. I think I improved the unit test. Revert some changes and keep the unit test isolated from the others, but trying to keep the same logic.  Tried to step on all the possible scenarios:
   
   - a) Assigned topic partition with no offset data at all  -> Response: None
   - b) Assigned topic partition with a valid OffsetAndMetadata -> Response: Long
   - c) Assigned topic partition with an invalidad OffsetAndMetadata (null case) -> Response: None
   - d) Unassigned topic partition with no offset data at all -> Response: nothing, not on the resulting List
   - e) Unassigned topic partition with a valid OffsetAndMetadata -> Response: Long
   - f) Unassigned topic partition with an invalidad OffsetAndMetadata (null case) -> Response: None
   
   Also modified a little bit the getPartitionOffsets to adapt it to being able to call directly to collectConsumerAssignment, as suggested.
   
   **PD:** As I posted on the comments earlier I encountered that the sibling test, _testAdminRequestsForDescribeOffsets_, lacks the validation for assigned topic partitions (it is currently only doing the validation against unassigned topic partitions, basically because there is not an topic's assigment to the testing consumer group at its initialization). Solved at the new test _testAdminRequestsForDescribeNegativeOffsets_, and I think that I could complement the former one. 
   
   What do you think? Is it worth to open a new PR to approach that separatly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-863611362


   @dajac Updated the PR. Passed directly the param to _collectConsumerAssignment_ as suggested, which makes the code cleaner. Also added a commit to fix an error I encountered on ARM compilation from the new unit test (from mockito).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-859545300


   @IgnacioAcunaF Thanks for the PR! I am a bit on the fence with regarding to modifying the admin api here. Users might expect 'null' in this case.
   
   Did you consider handling this in the command line tool? Also, it would be to add a unit test for the bug.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r659040080



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(new ListOffsetsResult(endOffsets.asJava))

Review comment:
       Once defined that way, test runs OK.
   Makes sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r651772882



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -106,7 +106,8 @@ class ConsumerGroupServiceTest {
   }
 
   private def listGroupOffsetsResult: ListConsumerGroupOffsetsResult = {
-    val offsets = topicPartitions.map(_ -> new OffsetAndMetadata(100)).toMap.asJava
+    // Half of the partitions of the testing topics are set to have a negative integer offset (null value [KAFKA-9507 for reference])
+    val offsets = topicPartitions.zipWithIndex.map{ case (tp, i) => tp -> ( if(i % 2 == 0) null else new OffsetAndMetadata(100) ) }.toMap.asJava

Review comment:
       Yes of course. Makes sense to keep tests isolated to avoid possible conflicts on the future. Will do that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r659051878



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(new ListOffsetsResult(endOffsets.asJava))

Review comment:
       And why the test was passing before, was because of this:
   
   ```
    def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
         val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
         val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
         ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
           (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
         }
       }
   ```
   
   The expectedOffsetsUnassigned was a subset of commitedOffsets (`commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap`)
   
   The expectedOffsetsAssignedTopics was a subset of endOffsets (`endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap`)
   
   It was defined that way to preserve the logic of `val unassignedPartitions = committedOffsets.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }`. 
   
   But your suggestion of use endOffsets on both as a filter seems cleaner to me. Just needed to adjust the test case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #10858:
URL: https://github.com/apache/kafka/pull/10858


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r650434257



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -559,7 +559,7 @@ object ConsumerGroupCommand extends Logging {
 
       val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
         val state = consumerGroup.state
-        val committedOffsets = getCommittedOffsets(groupId)
+        val committedOffsets = getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata])

Review comment:
       @dajac 
   That approach seem nicer, because keeps the change very narrow.
   Thanks for you comment/feedback. 
   Updated the PR. 
   
   To be completly honest I've been struggling all day with the unit test, because I am not being able to force a KafkaConsumer to start at an -1 offset to write an unit test (always starts at 0, which makes sense with OffsetAndMetada's _IllegalArgumentException("Invalid negative offset_). Dont know for what particular reason that consumer-group got on that condition:
   
   a) GroupID information (from describeConsumerGroups() method):
   (groupId=order-validations, isSimpleConsumerGroup=false, members=(memberId=order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6, groupInstanceId=null, clientId=order-validations, host=/127.0.0.1, assignment=(topicPartitions=rtl_orderReceive-0,rtl_orderReceive-1,rtl_orderReceive-2,rtl_orderReceive-3,rtl_orderReceive-4,rtl_orderReceive-5,rtl_orderReceive-6,rtl_orderReceive-7,rtl_orderReceive-8,rtl_orderReceive-9)), partitionAssignor=RoundRobinAssigner, state=Stable, coordinator=f0527.cluster.cl:31047 (id: 1 rack: null), authorizedOperations=[])
   
   b) Commited Offsets information (from getCommittedOffsets() method):
   Map(rtl_orderReceive-0 -> null, rtl_orderReceive-1 -> OffsetAndMetadata{offset=39, leaderEpoch=null, metadata=''}, rtl_orderReceive-2 -> null, rtl_orderReceive-3 -> OffsetAndMetadata{offset=33, leaderEpoch=null, metadata=''}, rtl_orderReceive-4 -> null, rtl_orderReceive-5 -> null, rtl_orderReceive-7 -> null, rtl_orderReceive-8 -> null)
   
   Whitout the change:
   ![image](https://user-images.githubusercontent.com/31544929/121788612-99c39300-cb9c-11eb-8aea-4d8ed1df97a9.png)
   
   With the change:
   ![image](https://user-images.githubusercontent.com/31544929/121788562-228dff00-cb9c-11eb-9c3f-985ef02d26e2.png)
   

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -559,7 +559,7 @@ object ConsumerGroupCommand extends Logging {
 
       val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
         val state = consumerGroup.state
-        val committedOffsets = getCommittedOffsets(groupId)
+        val committedOffsets = getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata])

Review comment:
       I was stuck with DescribeConsumerGroupTest. You are right, looking at ConsumerGroupServiceTest it should do the trick. Would try that.
   Thanks for the tip




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657483179



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned topic partition
+      // testTopicPartition3 ->  there is no offset information for an unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = {
+      val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+      val description = new ConsumerGroupDescription(group,
+        true,
+        Collections.singleton(member1),
+        classOf[RangeAssignor].getName,
+        groupState,
+        new Node(1, "localhost", 9092))
+      new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description)))
+    }
+
+    def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }

Review comment:
       Yes, that's a good idea. Going to unifiy both cases on a single argument matcher




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657421072



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned topic partition
+      // testTopicPartition3 ->  there is no offset information for an unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition
+    ).asJava

Review comment:
       Perfect, makes sense. Would do that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r651770604



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -580,7 +581,8 @@ object ConsumerGroupCommand extends Logging {
             groupId,
             Option(consumerGroup.coordinator),
             unassignedPartitions.keySet.toSeq,
-            unassignedPartitions.map { case (tp, offset) => tp -> Some(offset.offset) },
+            // Because there could be cases where the last commited offset is a negative integer (which translates to null [KAFKA-9507 for reference]), the following map function could lead to a NullPointerException. Two possible types are possible: OffsetsAndMetadata or null (which gets translated to None to avoid further exceptions)
+            unassignedPartitions.map { case (tp, offset) => tp -> ( if(offset.isInstanceOf[OffsetAndMetadata]) Some(offset.offset) else None ) },

Review comment:
       Absolutely right, same helper function works both cases and keeps the code cleaner and consistent. Going to change that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-868867986


   Encounter and error on compileTestScala, so change
   `endOffsets.view.filterKeys(assignedTopicPartitions.contains).toMap.asJava`
   
   to
   
   `endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava`
   
   
   
   > Task :core:compileTestScala
   [Error] /mnt/c/Users/IgnacioAndresAcunaFr/dev/IgnacioAcunaF/kafka/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala:117: value filterKeys is not a member of scala.collection.IterableView[(org.apache.kafka.common.TopicPartition, org.apache.kafka.common.KafkaFuture[org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo]),scala.collection.immutable.Map[org.apache.kafka.common.TopicPartition,org.apache.kafka.common.KafkaFuture[org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo]]]
   [Error] /mnt/c/Users/IgnacioAndresAcunaFr/dev/IgnacioAcunaF/kafka/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala:121: value filterKeys is not a member of scala.collection.IterableView[(org.apache.kafka.common.TopicPartition, org.apache.kafka.common.KafkaFuture[org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo]),scala.collection.immutable.Map[org.apache.kafka.common.TopicPartition,org.apache.kafka.common.KafkaFuture[org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo]]]


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #10858:
URL: https://github.com/apache/kafka/pull/10858


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657158355



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned topic partition
+      // testTopicPartition3 ->  there is no offset information for an unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition
+    ).asJava

Review comment:
       I would remove all the comments and only put one before `offsets` which explains that certain partitions are not present and certain have `null` or something along these lines.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned topic partition
+      // testTopicPartition3 ->  there is no offset information for an unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = {

Review comment:
       `describeGroupsResult` is used only once in the test. It seems that we could simply declare a variable which contains the result that we want to return.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned topic partition
+      // testTopicPartition3 ->  there is no offset information for an unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet

Review comment:
       So here, we have `testTopicPartition4` and `testTopicPartition5` because `testTopicPartition3` is not in `offsets`. Is it expected to not have `testTopicPartition3`? Personally, I would rather prefer to list the partition explicitly here. It makes the test easier to read in my opinion.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned topic partition
+      // testTopicPartition3 ->  there is no offset information for an unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = {
+      val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+      val description = new ConsumerGroupDescription(group,
+        true,
+        Collections.singleton(member1),
+        classOf[RangeAssignor].getName,
+        groupState,
+        new Node(1, "localhost", 9092))
+      new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description)))
+    }
+
+    def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }

Review comment:
       I suppose that we are using an argument matcher because `OffsetSpec` is not comparable. Am I getting this right?
   
   If this is the case, as `offsetsArgMatcherUnassignedTopics` and `offsetsArgMatcherAssignedTopics` are nearly identical, I wonder if we could define an argument matcher which received the expected `Map` as an argument. Would this be possible?

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned topic partition
+      // testTopicPartition3 ->  there is no offset information for an unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = {
+      val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+      val description = new ConsumerGroupDescription(group,
+        true,
+        Collections.singleton(member1),
+        classOf[RangeAssignor].getName,
+        groupState,
+        new Node(1, "localhost", 9092))
+      new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description)))
+    }
+
+    def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = offsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets))
+    doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics, any())
+    doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics, any())

Review comment:
       Any reasons why we don't use `when(...).thenReturn(...)` here as well? It seems that it should work as well.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned topic partition
+      // testTopicPartition3 ->  there is no offset information for an unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = {
+      val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+      val description = new ConsumerGroupDescription(group,
+        true,
+        Collections.singleton(member1),
+        classOf[RangeAssignor].getName,
+        groupState,
+        new Node(1, "localhost", 9092))
+      new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description)))
+    }
+
+    def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = offsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets))
+    doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics, any())
+    doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics, any())
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    assertEquals(Some("Stable"), state)
+    assertTrue(assignments.nonEmpty)
+    // Results should have information for all assigned topic partition (even if there is not Offset's information at all, because they get fills with None)
+    // Results should have information only for unassigned topic partitions if and only if there is information about them (included with null values)
+    assertEquals(assignedTopicPartitions.size + unassignedTopicPartitions.size , assignments.get.size)
+    assignments.map( results =>
+      results.map( partitionAssignmentState =>
+        (partitionAssignmentState.topic, partitionAssignmentState.partition) match {
+          case (Some("testTopic1"), Some(0)) => assertEquals(None, partitionAssignmentState.offset)
+          case (Some("testTopic1"), Some(1)) => assertEquals(Some(100), partitionAssignmentState.offset)
+          case (Some("testTopic1"), Some(2)) => assertEquals(None, partitionAssignmentState.offset)
+          case (Some("testTopic2"), Some(1)) => assertEquals(Some(100), partitionAssignmentState.offset)
+          case (Some("testTopic2"), Some(2)) => assertEquals(None, partitionAssignmentState.offset)
+          case _ => assertTrue(false)
+    }))

Review comment:
       I am not a fan of doing assertions like this. I would rather prefer to use `assertEquals` if possible because the error message is more meaningful in case of failure. We could do something like this:
   
   ```
   returnedOffsets = assignmentsOpt.map { assignments =>
     assignments.map { assignment => 
       new TopicPartition(....) -> partitionAssignmentState.offset
     }.toMap
   }.getOrElse(Map.empty)
   
   expectedOffsets = Map(
      ...
   )
   
   assertEquals(expectedOffsets, returnedOffsets)
   ```
   
   This would replace all the assertions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-870107728






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r658978444



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava

Review comment:
       Actually, put that case based on what I saw on that consumer-group:
   
   This is the consumer-group state:
   `(groupId=order-validations, isSimpleConsumerGroup=false, members=(memberId=order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6, groupInstanceId=null, clientId=order-validations, host=/127.0.0.1, assignment=(topicPartitions=rtl_orderReceive-0,rtl_orderReceive-1,rtl_orderReceive-2,rtl_orderReceive-3,rtl_orderReceive-4,rtl_orderReceive-5,rtl_orderReceive-6,rtl_orderReceive-7,rtl_orderReceive-8,rtl_orderReceive-9)), partitionAssignor=RoundRobinAssigner, state=Stable, coordinator=f0527.cluster.cl:31047 (id: 1 rack: null), authorizedOperations=[])`
   
   It has assigned all the partitions to rtl_orderReceive, but when getting the commited offsets:
   
   `Map(rtl_orderReceive-0 -> null, rtl_orderReceive-1 -> OffsetAndMetadata{offset=39, leaderEpoch=null, metadata=''}, rtl_orderReceive-2 -> null, rtl_orderReceive-3 -> OffsetAndMetadata{offset=33, leaderEpoch=null, metadata=''}, rtl_orderReceive-4 -> null, rtl_orderReceive-5 -> null, rtl_orderReceive-7 -> null, rtl_orderReceive-8 -> null)`
   
   Even that partition 6 was assigned, the aren't values commited for it (even not a -1). 
   That is for the case of assigned partitions, but for unassigned partitions, thinking it now, as it is the subsets of commited offsets that aren't assigned, it makes no sense to have non defined commited unassigned partition (because if there weren't a commited partition, then wouldn't exist the unassigned partition). 
   
   So I am thinking on:
   
   ```
   val commitedOffsets = Map(
         testTopicPartition1 -> new OffsetAndMetadata(100),
         testTopicPartition2 -> null,
         testTopicPartition3 -> new OffsetAndMetadata(100),
         testTopicPartition4 -> new OffsetAndMetadata(100),
         testTopicPartition5 -> null,
       ).asJava
   ```
   
   Just letting testTopicPartition0 as undefined, but defining testTopicPartition3 (unassigned). 
   Makes sense?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r651515816



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -580,7 +581,8 @@ object ConsumerGroupCommand extends Logging {
             groupId,
             Option(consumerGroup.coordinator),
             unassignedPartitions.keySet.toSeq,
-            unassignedPartitions.map { case (tp, offset) => tp -> Some(offset.offset) },
+            // Because there could be cases where the last commited offset is a negative integer (which translates to null [KAFKA-9507 for reference]), the following map function could lead to a NullPointerException. Two possible types are possible: OffsetsAndMetadata or null (which gets translated to None to avoid further exceptions)
+            unassignedPartitions.map { case (tp, offset) => tp -> ( if(offset.isInstanceOf[OffsetAndMetadata]) Some(offset.offset) else None ) },

Review comment:
       I am still not a fan of using `offset.isInstanceOf[OffsetAndMetadata]` in this case...
   
   Now that we need the same logic twice, what about defining a small helper function which takes an `OffsetAndMetadata` and return its offset if not null?
   
   ```
   // The admin client returns `null` as a value to indicate that there is not committed offset for a partition. 
   def getPartitionOffset(tp: TopicPartition): Option[Long] = committedOffsets.get(tp).filter(_ != null).map(_.offset)
   ```
   
   We could use it in both places.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -106,7 +106,8 @@ class ConsumerGroupServiceTest {
   }
 
   private def listGroupOffsetsResult: ListConsumerGroupOffsetsResult = {
-    val offsets = topicPartitions.map(_ -> new OffsetAndMetadata(100)).toMap.asJava
+    // Half of the partitions of the testing topics are set to have a negative integer offset (null value [KAFKA-9507 for reference])
+    val offsets = topicPartitions.zipWithIndex.map{ case (tp, i) => tp -> ( if(i % 2 == 0) null else new OffsetAndMetadata(100) ) }.toMap.asJava

Review comment:
       I am not comfortable with hijacking all existing tests like this. Would it be possible to define a new unit test(s) which triggers the identified issue(s)? I think that it will help to not regress in the future if we have explicit test(s).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r650349885



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -559,7 +559,7 @@ object ConsumerGroupCommand extends Logging {
 
       val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
         val state = consumerGroup.state
-        val committedOffsets = getCommittedOffsets(groupId)
+        val committedOffsets = getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata])

Review comment:
       Did you consider handling the 'null' as follow?
   
   '''
   topicPartition -> committedOffsets.get(topicPartition).filter(_ != null).map(_.offset)
   '''
   
   I feel like that this would be a bit more comprehensive as it explicitly handle the 'null' case. We could also add a small comment to explain why it is here.
   
   Could you also add a unit test for the bug please?

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -559,7 +559,7 @@ object ConsumerGroupCommand extends Logging {
 
       val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
         val state = consumerGroup.state
-        val committedOffsets = getCommittedOffsets(groupId)
+        val committedOffsets = getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata])

Review comment:
       Thanks. Regarding the test, we could perhaps add a unit test in ConsumerGroupServiceTest which relies on a mocked admin client. Would this work?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652249976



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -110,6 +132,12 @@ class ConsumerGroupServiceTest {
     AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)
   }
 
+  private def listGroupNegativeOffsetsResult: ListConsumerGroupOffsetsResult = {
+    // Half of the partitions of the testing topics are set to have a negative integer offset (null value [KAFKA-9507 for reference])
+    val offsets = topicPartitions.zipWithIndex.map{ case (tp, i) => tp -> ( if(i % 2 == 0) null else new OffsetAndMetadata(100) ) }.toMap.asJava

Review comment:
       Yes. I am generating explicitly the test cases on the function, like something like this (work in progress):
   
   
   ```
     def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
       val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
       val groupService = consumerGroupService(args)
   
       val testTopicPartition0 = new TopicPartition("testTopic", 0);
       val testTopicPartition1 = new TopicPartition("testTopic", 1);
       val testTopicPartition2 = new TopicPartition("testTopic", 2);
       val testTopicPartition3 = new TopicPartition("testTopic", 3);
       val testTopicPartition4 = new TopicPartition("testTopic", 4);
       val testTopicPartition5 = new TopicPartition("testTopic", 5);
       val testTopicPartition6 = new TopicPartition("testTopic", 6);
       val testTopicPartition7 = new TopicPartition("testTopic", 7);
       val testTopicPartition8 = new TopicPartition("testTopic", 8);
       val testTopicPartition9 = new TopicPartition("testTopic", 9);
   
       val offsets = Map(
         testTopicPartition0 -> new OffsetAndMetadata(100),
         testTopicPartition1 -> new OffsetAndMetadata(100),
         testTopicPartition2 -> new OffsetAndMetadata(100),
         testTopicPartition3 -> new OffsetAndMetadata(100),
         testTopicPartition4 -> new OffsetAndMetadata(100),
         testTopicPartition5 -> new OffsetAndMetadata(100),
         testTopicPartition6 -> new OffsetAndMetadata(100),
         testTopicPartition7 -> new OffsetAndMetadata(100),
         testTopicPartition8 -> new OffsetAndMetadata(100),
         testTopicPartition9 -> null,
       ).asJava
   ```
   Does this makes sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652242501



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,28 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(listGroupNegativeOffsetsResult)
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(listOffsetsResult)
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    assertEquals(Some("Stable"), state)
+    assertTrue(assignments.nonEmpty)
+    assertEquals(topicPartitions.size, assignments.get.size)

Review comment:
       Yes, i could add that kind of assertions. 
   Regarding the second question, I encounter something interesting that we may need to improve on the original test. This is the original function (AS-IS) of describing the ConsumerGroup:
   
   ```
   private def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = {
       val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", null)
       val description = new ConsumerGroupDescription(group,
         true,
         Collections.singleton(member1),
         classOf[RangeAssignor].getName,
         groupState,
         new Node(1, "localhost", 9092))
       new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description)))
     }
     ```
     
    As you can see, last parameter of member1 declaration is _null_. That is the assignment of the partitions to the consumer. Always is set to null, so no partition is assigned to the consumer group. For example, this is the print of the consumerGroups of the _testAdminRequestsForDescribeOffsets_ (whith no change so far);
    
    (groupId=testGroup, isSimpleConsumerGroup=true, members=(memberId=member1, groupInstanceId=instance1, clientId=client1, host=host1, assignment=(topicPartitions=)), partitionAssignor=org.apache.kafka.clients.consumer.RangeAssignor, state=Stable, coordinator=localhost:9092 (id: 1 rack: null), authorizedOperations=[])
   
   That is the result of  `val consumerGroups = describeConsumerGroups(groupIds)` on ConsumerGroupCommand's collectGroupsOffsets function of the regular test. As you can see, there aren't assigned partitions to the consumer: `assignment=(topicPartitions=)`
   
   So the regular test is skipping the _assignedTopicPartitions_ part, and going directly to _unassignedPartitions_. So the test is **only covering unassignedPartitions case.** 
   
   Do you think is something worth to improve?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652153471



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -560,14 +560,16 @@ object ConsumerGroupCommand extends Logging {
       val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
         val state = consumerGroup.state
         val committedOffsets = getCommittedOffsets(groupId)
+        // The admin client returns `null` as a value to indicate that there is not committed offset for a partition. The following getPartitionOffset function seeks to avoid NullPointerException by filtering out those null values.
+        def getPartitionOffset(tp: TopicPartition): Option[Long] = committedOffsets.get(tp).filter(_ != null).map(_.offset)
         var assignedTopicPartitions = ListBuffer[TopicPartition]()
         val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
           .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>
           val topicPartitions = consumerSummary.assignment.topicPartitions.asScala
           assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
           val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
             .map { topicPartition =>
-              topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
+              topicPartition -> getPartitionOffset(topicPartition)

Review comment:
       Not directly. As `collectConsumerAssignment` expects for getPartitionOffset parameter to be a `TopicPartition => Option[Long]`, and as defined getPartitionOffset returns a Option[Long] for the specified TopicPartition.
   It can be modified to accept a sequence of TopicPartitions though and return a map, something like this:
   
   ```
   def getPartitionOffsets(tp: Seq[TopicPartition]): TopicPartition => Option[Long] = tp.map { topicPartition => topicPartition -> committedOffsets.get(topicPartition).filter(_ != null).map(_.offset)}.toMap
   ```
   
   That way can be pass directly to collectConsumerAssignment, by changing:
   
   ```
   val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
               .map { topicPartition =>
                 topicPartition -> getPartitionOffset(topicPartition)
               }.toMap 
               
   collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,
               partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
               Some(s"${consumerSummary.clientId}"))
   ```
   
   To:
   
   ```
   collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,
               getPartitionOffsets(consumerSummary.assignment.topicPartitions.asScala.toSeq), Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
               Some(s"${consumerSummary.clientId}"))
   ```
   
   @dajac Do you think is a good approach?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF edited a comment on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF edited a comment on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-859209484


   Regarding tests:
   
   - JDK 15 and Scala 2.13: 
   Successful
   _[2021-06-11T00:23:47.258Z] 196 actionable tasks: 105 executed, 91 up-to-date_
   
   - JDK 11 and Scala 2.13
   Failed on only what seem's to be a flaky test [KAFKA-8940](https://issues.apache.org/jira/browse/KAFKA-8940)
   [2021-06-10T23:38:40.358Z] org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest > shouldWorkWithRebalance FAILED
   [2021-06-10T23:38:40.358Z]     java.lang.AssertionError: verifying tagg
   _[2021-06-11T00:08:48.428Z] 273 tests completed, 1 failed_
   
   - JDK 8 and Scala 2.12:
   Failed on non related to change (primaraly on ZooKeeper) tests.
   _[2021-06-11T00:14:17.629Z] 1205 tests completed, 36 failed, 8 skipped_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657507129



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned topic partition
+      // testTopicPartition3 ->  there is no offset information for an unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = {
+      val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+      val description = new ConsumerGroupDescription(group,
+        true,
+        Collections.singleton(member1),
+        classOf[RangeAssignor].getName,
+        groupState,
+        new Node(1, "localhost", 9092))
+      new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description)))
+    }
+
+    def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = offsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets))
+    doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics, any())
+    doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics, any())
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    assertEquals(Some("Stable"), state)
+    assertTrue(assignments.nonEmpty)
+    // Results should have information for all assigned topic partition (even if there is not Offset's information at all, because they get fills with None)
+    // Results should have information only for unassigned topic partitions if and only if there is information about them (included with null values)
+    assertEquals(assignedTopicPartitions.size + unassignedTopicPartitions.size , assignments.get.size)
+    assignments.map( results =>
+      results.map( partitionAssignmentState =>
+        (partitionAssignmentState.topic, partitionAssignmentState.partition) match {
+          case (Some("testTopic1"), Some(0)) => assertEquals(None, partitionAssignmentState.offset)
+          case (Some("testTopic1"), Some(1)) => assertEquals(Some(100), partitionAssignmentState.offset)
+          case (Some("testTopic1"), Some(2)) => assertEquals(None, partitionAssignmentState.offset)
+          case (Some("testTopic2"), Some(1)) => assertEquals(Some(100), partitionAssignmentState.offset)
+          case (Some("testTopic2"), Some(2)) => assertEquals(None, partitionAssignmentState.offset)
+          case _ => assertTrue(false)
+    }))

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657481603



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned topic partition
+      // testTopicPartition3 ->  there is no offset information for an unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = {
+      val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+      val description = new ConsumerGroupDescription(group,
+        true,
+        Collections.singleton(member1),
+        classOf[RangeAssignor].getName,
+        groupState,
+        new Node(1, "localhost", 9092))
+      new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description)))
+    }
+
+    def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = offsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets))
+    doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics, any())
+    doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics, any())

Review comment:
       In fact that was my first approach, but defining a when(...).thenReturn(...) for two differentes ArgMatcher  for the same method throw an Null Pointer Error at Mockito. Used the doAnswer as a workaround. 
   But taking into account tha previous comment, I am working on a single when(...).thenReturn(...), so it wouldn't be a problem anymore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF edited a comment on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF edited a comment on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-858703098


   PING @hachikuji @apovzner (as I saw you on [KAFKA-9507](https://github.com/apache/kafka/pull/8057))


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r659616835



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition3 -> new OffsetAndMetadata(100),
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+      topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.asJava))
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    val returnedOffsets = assignments.map { results =>
+      results.map { assignment =>
+        new TopicPartition(assignment.topic.get, assignment.partition.get) -> assignment.offset
+      }.toMap
+    }.getOrElse(Map.empty)
+    // Results should have information for all assigned topic partition (even if there is not Offset's information at all, because they get fills with None)
+    // Results should have information only for unassigned topic partitions if and only if there is information about them (including with null values)

Review comment:
       Is this comment still relevant? It seems that we have a value for all partitions now.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition3 -> new OffsetAndMetadata(100),
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+      topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.asJava))

Review comment:
       nit: Same here.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition3 -> new OffsetAndMetadata(100),
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+      topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava))

Review comment:
       nit: We usually put a space before the `{`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-859209484






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r660173021



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition3 -> new OffsetAndMetadata(100),
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+      topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava))

Review comment:
       Sure, will add 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r651903223



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -560,14 +560,16 @@ object ConsumerGroupCommand extends Logging {
       val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
         val state = consumerGroup.state
         val committedOffsets = getCommittedOffsets(groupId)
+        // The admin client returns `null` as a value to indicate that there is not committed offset for a partition. The following getPartitionOffset function seeks to avoid NullPointerException by filtering out those null values.

Review comment:
       nit: I think that we can shorten the comment a bit. `The admin client returns `null` as a value to indicate that there is not committed offset for a partition.` seems sufficient to me.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -110,6 +132,12 @@ class ConsumerGroupServiceTest {
     AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)
   }
 
+  private def listGroupNegativeOffsetsResult: ListConsumerGroupOffsetsResult = {
+    // Half of the partitions of the testing topics are set to have a negative integer offset (null value [KAFKA-9507 for reference])
+    val offsets = topicPartitions.zipWithIndex.map{ case (tp, i) => tp -> ( if(i % 2 == 0) null else new OffsetAndMetadata(100) ) }.toMap.asJava

Review comment:
       Could we revert this change and use a `ListConsumerGroupOffsetsResult` tailored only for `testAdminRequestsForDescribeNegativeOffsets`?

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -580,7 +582,7 @@ object ConsumerGroupCommand extends Logging {
             groupId,
             Option(consumerGroup.coordinator),
             unassignedPartitions.keySet.toSeq,
-            unassignedPartitions.map { case (tp, offset) => tp -> Some(offset.offset) },
+            unassignedPartitions.map { case (tp, offset) => tp -> getPartitionOffset(tp) },

Review comment:
       Same for this one.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,28 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(listGroupNegativeOffsetsResult)
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(listOffsetsResult)
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    assertEquals(Some("Stable"), state)
+    assertTrue(assignments.nonEmpty)
+    assertEquals(topicPartitions.size, assignments.get.size)

Review comment:
       Could we also assert that the `offset` is `None` for a partition that does not have it? Does the test also cover the `unassignedPartitions` case? I suppose that it does not, right?

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -560,14 +560,16 @@ object ConsumerGroupCommand extends Logging {
       val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
         val state = consumerGroup.state
         val committedOffsets = getCommittedOffsets(groupId)
+        // The admin client returns `null` as a value to indicate that there is not committed offset for a partition. The following getPartitionOffset function seeks to avoid NullPointerException by filtering out those null values.
+        def getPartitionOffset(tp: TopicPartition): Option[Long] = committedOffsets.get(tp).filter(_ != null).map(_.offset)
         var assignedTopicPartitions = ListBuffer[TopicPartition]()
         val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
           .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>
           val topicPartitions = consumerSummary.assignment.topicPartitions.asScala
           assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
           val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
             .map { topicPartition =>
-              topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
+              topicPartition -> getPartitionOffset(topicPartition)

Review comment:
       I think that we could avoid computing `partitionOffsets` and directly pass `getPartitionOffset` to `collectConsumerAssignment`. Does it work?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652153471



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -560,14 +560,16 @@ object ConsumerGroupCommand extends Logging {
       val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
         val state = consumerGroup.state
         val committedOffsets = getCommittedOffsets(groupId)
+        // The admin client returns `null` as a value to indicate that there is not committed offset for a partition. The following getPartitionOffset function seeks to avoid NullPointerException by filtering out those null values.
+        def getPartitionOffset(tp: TopicPartition): Option[Long] = committedOffsets.get(tp).filter(_ != null).map(_.offset)
         var assignedTopicPartitions = ListBuffer[TopicPartition]()
         val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
           .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>
           val topicPartitions = consumerSummary.assignment.topicPartitions.asScala
           assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
           val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
             .map { topicPartition =>
-              topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
+              topicPartition -> getPartitionOffset(topicPartition)

Review comment:
       Not directly. As `collectConsumerAssignment` expects for getPartitionOffset parameter to be a `TopicPartition => Option[Long]`, and as defined getPartitionOffset returns a Option[Long] for the specified TopicPartition.
   It can be modified to accept a sequence of TopicPartitions though, to something like this:
   
   ```
   def getPartitionOffsets(tp: Seq[TopicPartition]): TopicPartition => Option[Long] = tp.map { topicPartition => topicPartition -> committedOffsets.get(topicPartition).filter(_ != null).map(_.offset)}.toMap
   ```
   
   That way can be pass directly to collectConsumerAssignment, by changing:
   
   ```
   val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
               .map { topicPartition =>
                 topicPartition -> getPartitionOffset(topicPartition)
               }.toMap 
               
   collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,
               partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
               Some(s"${consumerSummary.clientId}"))
   ```
   
   To:
   
   ```
   collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,
               getPartitionOffsets(consumerSummary.assignment.topicPartitions.asScala.toSeq), Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
               Some(s"${consumerSummary.clientId}"))
   ```
   
   @dajac Do you think is a good approach?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652187497



##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -580,7 +582,7 @@ object ConsumerGroupCommand extends Logging {
             groupId,
             Option(consumerGroup.coordinator),
             unassignedPartitions.keySet.toSeq,
-            unassignedPartitions.map { case (tp, offset) => tp -> Some(offset.offset) },
+            unassignedPartitions.map { case (tp, offset) => tp -> getPartitionOffset(tp) },

Review comment:
       Same with the previous approach, it could be from:
   **From**:
   ```
   val rowsWithoutConsumer = if (unassignedPartitions.nonEmpty) {
             collectConsumerAssignment(
               groupId,
               Option(consumerGroup.coordinator),
               unassignedPartitions.keySet.toSeq,
               unassignedPartitions.map { case (tp, offset) => tp -> getPartitionOffset(tp) },
               Some(MISSING_COLUMN_VALUE),
               Some(MISSING_COLUMN_VALUE),
               Some(MISSING_COLUMN_VALUE)).toSeq
           } else
             Seq.empty
   ```
   **To**:
   ```
   val rowsWithoutConsumer = if (unassignedPartitions.nonEmpty) {
             collectConsumerAssignment(
               groupId,
               Option(consumerGroup.coordinator),
               unassignedPartitions.keySet.toSeq,
               getPartitionOffsets(unassignedPartitions.keySet.toSeq),
               Some(MISSING_COLUMN_VALUE),
               Some(MISSING_COLUMN_VALUE),
               Some(MISSING_COLUMN_VALUE)).toSeq
           } else
             Seq.empty
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657479764



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned topic partition
+      // testTopicPartition3 ->  there is no offset information for an unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet

Review comment:
       You are absolutely right, miss testTopicPartition3. Going to set it explicitly 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-861504636


   Thanks @dajac for your review and comments.
   Just pushed an update with them on consideration. What do you think?
   
   Agree, there could be more potentially cases leading to a null pointer exception in the tool for similar reasons. Will take a look also if I encounter some as a follow-up for the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-870568479


   Thanks @dajac for review and comments.
   Learnt a lot :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-868844354


   Hi @dajac,
   thanks again for review and support. 
   Made some changes to the PR taking them into account.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r658995018



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(new ListOffsetsResult(endOffsets.asJava))

Review comment:
       Going to do some research, but seems like the problem I had before, the second call to when(..).then(..) doesnt get call, so the admin.listOffsets is call directly with no mock results. 
   
   Going to take a look.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652242501



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,28 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(listGroupNegativeOffsetsResult)
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(listOffsetsResult)
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    assertEquals(Some("Stable"), state)
+    assertTrue(assignments.nonEmpty)
+    assertEquals(topicPartitions.size, assignments.get.size)

Review comment:
       Yes, i could add that kind of assertions. 
   Regarding the second question, I encounter something interesting that we may need to improve on the original test. This is the original function (AS-IS) of describing the ConsumerGroup:
   
   ```
   private def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = {
       val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", null)
       val description = new ConsumerGroupDescription(group,
         true,
         Collections.singleton(member1),
         classOf[RangeAssignor].getName,
         groupState,
         new Node(1, "localhost", 9092))
       new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description)))
     }
     ```
     
    As you can see, last parameter of member1 declaration is _null_. That is the assignment of the partitions to the consumer. Always is set to null, so no partition is assigned to the consumer group. For example, this is the print of the consumerGroups of the _testAdminRequestsForDescribeOffsets_ (whith no change so far);
    
    (groupId=testGroup, isSimpleConsumerGroup=true, members=(memberId=member1, groupInstanceId=instance1, clientId=client1, host=host1, assignment=(topicPartitions=)), partitionAssignor=org.apache.kafka.clients.consumer.RangeAssignor, state=Stable, coordinator=localhost:9092 (id: 1 rack: null), authorizedOperations=[])
   
   That is the result of  `val consumerGroups = describeConsumerGroups(groupIds)` on ConsumerGroupCommand's collectGroupsOffsets function of the regular test. As you can see, there aren't assigned partitions to the consumer: `assignment=(topicPartitions=)`
   
   So the regular test is skipping the _assignedTopicPartitions_ part, and going directly to _unassignedPartitions_. So the test is only testing unassignedPartitions. 
   
   Do you think is something worth to improve?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r660172772



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition3 -> new OffsetAndMetadata(100),
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+      topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.asJava))
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    val returnedOffsets = assignments.map { results =>
+      results.map { assignment =>
+        new TopicPartition(assignment.topic.get, assignment.partition.get) -> assignment.offset
+      }.toMap
+    }.getOrElse(Map.empty)
+    // Results should have information for all assigned topic partition (even if there is not Offset's information at all, because they get fills with None)
+    // Results should have information only for unassigned topic partitions if and only if there is information about them (including with null values)

Review comment:
       You are right, it's not relevant because all partitions have information now. Going to remove




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r659039997



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(new ListOffsetsResult(endOffsets.asJava))

Review comment:
       Actually yes, is because of the fact that not all the end offsets are returned all the time now, but only for unassigned partitions case.
   
   Because the ConsumerGroupCommand defines unassigned topic partition as:
   `val unassignedPartitions = committedOffsets.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }`
   
   The definition that I gave to the test is breaking that logic, because if the topic partition is not in the commitedOffsets, by definition shouldn't be on the unassignedPartitions.
   The test, as as I primarily defined, is declaring testTopicPartition3 as an unassigned partition, but that partition is not being defined also on the commitedOffsets. So is not respecting the previous statment `val unassignedPartitions = committedOffsets.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }`. That is the reason that the test is failing.
   
   So the test case should be:
   ```
   val commitedOffsets = Map(
         testTopicPartition1 -> new OffsetAndMetadata(100),
         testTopicPartition2 -> null,
         testTopicPartition3 -> new OffsetAndMetadata(100),
         testTopicPartition4 -> new OffsetAndMetadata(100),
         testTopicPartition5 -> null,
       ).asJava
   ```
   Just the testTopicPartition0 (which is assigned) as non defined, because as I seem there is never going to be a case where there is a non defined unassigned partition, because it is a requirement to be defined on commitedOffsets in order to be consider as unasigned.
   On the other way, there could be assigned but not commited partitions.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(new ListOffsetsResult(endOffsets.asJava))

Review comment:
       Makes sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r660172772



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition3 -> new OffsetAndMetadata(100),
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+      topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.asJava))
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    val returnedOffsets = assignments.map { results =>
+      results.map { assignment =>
+        new TopicPartition(assignment.topic.get, assignment.partition.get) -> assignment.offset
+      }.toMap
+    }.getOrElse(Map.empty)
+    // Results should have information for all assigned topic partition (even if there is not Offset's information at all, because they get fills with None)
+    // Results should have information only for unassigned topic partitions if and only if there is information about them (including with null values)

Review comment:
       You are right, it's not relevant because all partitions have information now. Going to remove

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition3 -> new OffsetAndMetadata(100),
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+      topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.asJava))

Review comment:
       👍 

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition3 -> new OffsetAndMetadata(100),
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+      topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava))

Review comment:
       Sure, will add 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r658993775



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(new ListOffsetsResult(endOffsets.asJava))

Review comment:
       Run the test defining all the topic-partitoins with some value but failed too.
   
   The stack:
   `ConsumerGroupServiceTest > testAdminRequestsForDescribeNegativeOffsets() FAILED
       java.lang.NullPointerException
           at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.getLogEndOffsets(ConsumerGroupCommand.scala:638)
           at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:411)
           at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
           at scala.collection.mutable.Growable.addAll(Growable.scala:62)
           at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
           at scala.collection.mutable.HashMap.addAll(HashMap.scala:117)
           at scala.collection.mutable.HashMap$.from(HashMap.scala:589)
           at scala.collection.mutable.HashMap$.from(HashMap.scala:582)
           at scala.collection.MapOps$WithFilter.map(Map.scala:381)
           at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:560)
           at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:551)
           at kafka.admin.ConsumerGroupServiceTest.testAdminRequestsForDescribeNegativeOffsets(ConsumerGroupServiceTest.scala:134)`
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652242501



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,28 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(listGroupNegativeOffsetsResult)
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(listOffsetsResult)
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    assertEquals(Some("Stable"), state)
+    assertTrue(assignments.nonEmpty)
+    assertEquals(topicPartitions.size, assignments.get.size)

Review comment:
       Yes, i could add that kind of assertions. 
   Encounter something interesting that we may need to improve on the original test. This is the original function (AS-IS) of describing the ConsumerGroup:
   
   ```
   private def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = {
       val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", null)
       val description = new ConsumerGroupDescription(group,
         true,
         Collections.singleton(member1),
         classOf[RangeAssignor].getName,
         groupState,
         new Node(1, "localhost", 9092))
       new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description)))
     }
     ```
     
    As you can see, last parameter of member1 declaration is _null_. That is the assignment of the partitions to the consumer. Always is set to null, so no partition is assigned to the consumer group. For example, this is the print of the consumerGroups of the _testAdminRequestsForDescribeOffsets_ (whith no change so far);
    
    (groupId=testGroup, isSimpleConsumerGroup=true, members=(memberId=member1, groupInstanceId=instance1, clientId=client1, host=host1, assignment=(topicPartitions=)), partitionAssignor=org.apache.kafka.clients.consumer.RangeAssignor, state=Stable, coordinator=localhost:9092 (id: 1 rack: null), authorizedOperations=[])
   
   That is the result of  `val consumerGroups = describeConsumerGroups(groupIds)` on ConsumerGroupCommand's collectGroupsOffsets function of the regular test. As you can see, there aren't assigned partitions to the consumer: `assignment=(topicPartitions=)`
   
   So the regular test is skipping the _assignedTopicPartitions_ part, and going directly to _unassignedPartitions_. So the test is only testing unassignedPartitions. 
   
   Do you think is something worth to improve?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r658996808



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(new ListOffsetsResult(endOffsets.asJava))

Review comment:
       You might need to adjust the expected arguments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r658571696



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava

Review comment:
       Actually, it seems that we should always have `null` or an `OffsetAndMetadata` here for each partition as the API always provided an answer for the requested partitions.
   
   `commitedOffsets` -> `committedOffsets`

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(new ListOffsetsResult(endOffsets.asJava))

Review comment:
       I was not fully satisfied with providing all the `endOffsets` in all the cases here so I took a bit of time to play with Mockito. I was able to make it work as follow:
   
   ```
       def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
         topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
       }
   
       when(admin.listOffsets(
         ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
         any()
       )).thenReturn(new ListOffsetsResult(endOffsets.view.filterKeys(assignedTopicPartitions.contains).toMap.asJava))
   
       when(admin.listOffsets(
         ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
         any()
       )).thenReturn(new ListOffsetsResult(endOffsets.view.filterKeys(unassignedTopicPartitions.contains).toMap.asJava))
   ```
   
   Note the `topicPartitionOffsets != null` condition. It seems that Mockito call the argument matcher once with `null` somehow.
   
   Interestingly, the test fails when I use this modified version. I think that this is due to the fact that not all the end offsets are returned all the time now and that `committedOffsets` must contain all the partitions. See my previous comment about this.
   
   What do you think?

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet || map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(new ListOffsetsResult(endOffsets.asJava))
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    val returnedOffsets = assignments.map { results =>
+      results.map { assignment =>
+        new TopicPartition(assignment.topic.get, assignment.partition.get) -> assignment.offset
+      }.toMap
+    }.getOrElse(Map.empty)
+    // Results should have information for all assigned topic partition (even if there is not Offset's information at all, because they get fills with None)
+    // Results should have information only for unassigned topic partitions if and only if there is information about them (including with null values)
+    val expectedOffsets = Map(
+      testTopicPartition0 -> None,
+      testTopicPartition1 -> Some(100),
+      testTopicPartition2 -> None,
+      testTopicPartition4 -> Some(100),
+      testTopicPartition5 -> None
+    )
+    assertEquals(Some("Stable"), state)
+    assertTrue(assignments.nonEmpty)

Review comment:
       This assertion is not needed anymore. It is implicitly checked by the next one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-867206422


   Hi @dajac, thanks again for your comments.
   I've updated the PR with them in consideration. 
   What do you think?
   best regards


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-870107728


   Thanks @dajac for review and comments!
   Updated the PR taking them into account. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657960159



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned topic partition
+      // testTopicPartition3 ->  there is no offset information for an unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = {
+      val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+      val description = new ConsumerGroupDescription(group,
+        true,
+        Collections.singleton(member1),
+        classOf[RangeAssignor].getName,
+        groupState,
+        new Node(1, "localhost", 9092))
+      new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description)))
+    }
+
+    def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = offsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets))
+    doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics, any())
+    doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics, any())

Review comment:
       Doing following on the stack with some printlns, in fact is not that Mockito throwing an Null Pointer Error, but when calling the second time it doesnt identified the argumentMatcher, so it invokes the function as normal. That normal invocation throw a null (because there is not data).
   
   Guessing that Mockito doesnt register the second ArgumentMatcher, and for that reason doesn't return the expected values.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r660172970



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +63,88 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition3 -> new OffsetAndMetadata(100),
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2)
+    val unassignedTopicPartitions = Set(testTopicPartition3, testTopicPartition4, testTopicPartition5)
+
+    val consumerGroupDescription = new ConsumerGroupDescription(group,
+      true,
+      Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))),
+      classOf[RangeAssignor].getName,
+      ConsumerGroupState.STABLE,
+      new Node(1, "localhost", 9092))
+
+    def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]): ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
+      topicPartitionOffsets => topicPartitionOffsets != null && topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
+    }
+
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(consumerGroupDescription))))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.asJava))
+    when(admin.listOffsets(
+      ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
+      any()
+    )).thenReturn(new ListOffsetsResult(endOffsets.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.asJava))

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r658978444



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava

Review comment:
       Actually, put that case based on what I saw on that consumer-group:
   
   This was the consumer-group state:
   `(groupId=order-validations, isSimpleConsumerGroup=false, members=(memberId=order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6, groupInstanceId=null, clientId=order-validations, host=/127.0.0.1, assignment=(topicPartitions=rtl_orderReceive-0,rtl_orderReceive-1,rtl_orderReceive-2,rtl_orderReceive-3,rtl_orderReceive-4,rtl_orderReceive-5,rtl_orderReceive-6,rtl_orderReceive-7,rtl_orderReceive-8,rtl_orderReceive-9)), partitionAssignor=RoundRobinAssigner, state=Stable, coordinator=f0527.cluster.cl:31047 (id: 1 rack: null), authorizedOperations=[])`
   
   It has assigned all the partitions to rtl_orderReceive, but when getting the commited offsets:
   
   `Map(rtl_orderReceive-0 -> null, rtl_orderReceive-1 -> OffsetAndMetadata{offset=39, leaderEpoch=null, metadata=''}, rtl_orderReceive-2 -> null, rtl_orderReceive-3 -> OffsetAndMetadata{offset=33, leaderEpoch=null, metadata=''}, rtl_orderReceive-4 -> null, rtl_orderReceive-5 -> null, rtl_orderReceive-7 -> null, rtl_orderReceive-8 -> null)`
   
   Even that partition 6 was assigned, the aren't values commited for it (even not a -1). 
   That is for the case of assigned partitions, but for unassigned partitions, thinking it now, as it is the subsets of commited offsets that aren't assigned, it makes no sense to have non defined commited unassigned partition (because if there weren't a commited partition, then wouldn't exist the unassigned partition). 
   
   So I am thinking on:
   
   ```
   val commitedOffsets = Map(
         testTopicPartition1 -> new OffsetAndMetadata(100),
         testTopicPartition2 -> null,
         testTopicPartition3 -> new OffsetAndMetadata(100),
         testTopicPartition4 -> new OffsetAndMetadata(100),
         testTopicPartition5 -> null,
       ).asJava
   ```
   
   Just letting testTopicPartition0 as undefined, but defining testTopicPartition3 (unassigned). 
   Makes sense?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-868855492


   Some compileTestScala Error arised. Looking on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] IgnacioAcunaF edited a comment on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
IgnacioAcunaF edited a comment on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-863611362


   @dajac Updated the PR. Passed directly the param to _collectConsumerAssignment_ as suggested, which makes the code cleaner. Also added a commit to fix an error I encountered on ARM compilation from the new unit test (from mockito).
   
   Tests run just fine:
   ![image](https://user-images.githubusercontent.com/31544929/122483784-594f8500-cfa1-11eb-95e9-b0330aefdc3d.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r658995349



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    // Some topic's partitions gets valid OffsetAndMetada values, other gets nulls values (negative integers) and others aren't defined
+    val commitedOffsets = Map(
+      testTopicPartition1 -> new OffsetAndMetadata(100),
+      testTopicPartition2 -> null,
+      testTopicPartition4 -> new OffsetAndMetadata(100),
+      testTopicPartition5 -> null,
+    ).asJava

Review comment:
       Thanks for the clarification. In this case, it might be better to keep committing in both cases in oder to be robust and to adjust the rest of the test accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657874082



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned topic partition
+      // testTopicPartition3 ->  there is no offset information for an unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = {
+      val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+      val description = new ConsumerGroupDescription(group,
+        true,
+        Collections.singleton(member1),
+        classOf[RangeAssignor].getName,
+        groupState,
+        new Node(1, "localhost", 9092))
+      new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description)))
+    }
+
+    def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, OffsetSpec] = {
+      val expectedOffsets = offsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets))
+    doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics, any())
+    doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics, any())

Review comment:
       Ah... Interesting. I did not know about this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org