You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/02/08 00:54:17 UTC

[kafka] branch 2.3 updated: KAFKA-9507; AdminClient should check for missing committed offsets (#8057)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 33e899d  KAFKA-9507; AdminClient should check for missing committed offsets (#8057)
33e899d is described below

commit 33e899d5e1542eb292dbf45b5a0cd71921d113e8
Author: David Mao <47...@users.noreply.github.com>
AuthorDate: Fri Feb 7 16:43:52 2020 -0800

    KAFKA-9507; AdminClient should check for missing committed offsets (#8057)
    
    Addresses exception being thrown by `AdminClient` when `listConsumerGroupOffsets` returns a negative offset. A negative offset indicates the absence of a committed offset for a requested partition, and should result in a null in the returned offset map.
    
    Reviewers: Anna Povzner <an...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java | 7 ++++++-
 .../apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java | 1 +
 .../java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java  | 7 ++++++-
 3 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 005ed6b..da90169 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2833,7 +2833,12 @@ public class KafkaAdminClient extends AdminClient {
                                 final Long offset = partitionData.offset;
                                 final String metadata = partitionData.metadata;
                                 final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
-                                groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
+                                // Negative offset indicates that the group has no committed offset for this partition
+                                if (offset < 0) {
+                                    groupOffsetsListing.put(topicPartition, null);
+                                } else {
+                                    groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
+                                }
                             } else {
                                 log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
                             }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
index 23657b5..17dc481 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
@@ -40,6 +40,7 @@ public class ListConsumerGroupOffsetsResult {
 
     /**
      * Return a future which yields a map of topic partitions to OffsetAndMetadata objects.
+     * If the group does not have a committed offset for this partition, the corresponding value in the returned map will be null.
      */
     public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() {
         return future;
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 6bbd87a..47eb9c4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1315,6 +1315,7 @@ public class KafkaAdminClientTest {
             TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
             TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
             TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
+            TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3);
 
             final Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
             responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10,
@@ -1323,15 +1324,19 @@ public class KafkaAdminClientTest {
                     Optional.empty(), "", Errors.NONE));
             responseData.put(myTopicPartition2, new OffsetFetchResponse.PartitionData(20,
                     Optional.empty(), "", Errors.NONE));
+            responseData.put(myTopicPartition3, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+                    Optional.empty(), "", Errors.NONE));
             env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData));
 
             final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets("group-0");
             final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata().get();
 
-            assertEquals(3, partitionToOffsetAndMetadata.size());
+            assertEquals(4, partitionToOffsetAndMetadata.size());
             assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0).offset());
             assertEquals(0, partitionToOffsetAndMetadata.get(myTopicPartition1).offset());
             assertEquals(20, partitionToOffsetAndMetadata.get(myTopicPartition2).offset());
+            assertTrue(partitionToOffsetAndMetadata.containsKey(myTopicPartition3));
+            assertNull(partitionToOffsetAndMetadata.get(myTopicPartition3));
         }
     }