You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2024/02/21 17:19:44 UTC

(kafka) branch trunk updated: KAFKA-16194: Do not return records from poll if group metadata unknown (#15369)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 02ebfc6108c KAFKA-16194: Do not return records from poll if group metadata unknown (#15369)
02ebfc6108c is described below

commit 02ebfc6108c86d8f7c74396f440ae5336e0bcae2
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Wed Feb 21 18:19:38 2024 +0100

    KAFKA-16194: Do not return records from poll if group metadata unknown (#15369)
    
    Due to the asynchronous nature of the async consumer, it might happen that on the application thread the group metadata is not known after the first poll returns records. If the offsets of those records are then send to a transaction with
    
    txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);
    
    and then the transaction is committed, the group coordinator will raise an error saying that the member is not known since the member in groupMetadata is still -1 before the metadata is updated.
    
    This commit avoids this error by not returning any records from poll() until the group metadata is updated, i.e., the member ID and the generation ID (a.k.a. member epoch) are known. This check is only done if group management is used.
    
    Additionally, this commit resets the group metadata when the consumer unsubscribes.
    
    Reviewer: Lucas Brutschy <lb...@confluent.io>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  24 +++-
 .../consumer/internals/AsyncKafkaConsumerTest.java | 148 +++++++++++++++++++++
 2 files changed, 166 insertions(+), 6 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index d2e4788a1bc..d4b461b0140 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -712,14 +712,18 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
                 wakeupTrigger.maybeTriggerWakeup();
 
                 updateAssignmentMetadataIfNeeded(timer);
-                final Fetch<K, V> fetch = pollForFetches(timer);
-                if (!fetch.isEmpty()) {
-                    if (fetch.records().isEmpty()) {
-                        log.trace("Returning empty records from `poll()` "
+                if (isGenerationKnownOrPartitionsUserAssigned()) {
+                    final Fetch<K, V> fetch = pollForFetches(timer);
+                    if (!fetch.isEmpty()) {
+                        if (fetch.records().isEmpty()) {
+                            log.trace("Returning empty records from `poll()` "
                                 + "since the consumer's position has advanced for at least one topic partition");
-                    }
+                        }
 
-                    return interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
+                        return interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
+                    }
+                } else {
+                    timer.update();
                 }
                 // We will wait for retryBackoffMs
             } while (timer.notExpired());
@@ -731,6 +735,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
         }
     }
 
+    private boolean isGenerationKnownOrPartitionsUserAssigned() {
+        if (subscriptions.hasAutoAssignedPartitions()) {
+            return groupMetadata.filter(g -> g.generationId() != JoinGroupRequest.UNKNOWN_GENERATION_ID).isPresent();
+        }
+        return true;
+    }
+
     /**
      * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
      * partitions.
@@ -1463,6 +1474,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
                 } catch (TimeoutException e) {
                     log.error("Failed while waiting for the unsubscribe event to complete");
                 }
+                groupMetadata = initializeGroupMetadata(groupMetadata.get().groupId(), Optional.empty());
             }
             subscriptions.unsubscribe();
         } finally {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 2db666e95e9..abd0f45c73f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -51,6 +51,7 @@ import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeAppl
 import org.apache.kafka.clients.consumer.internals.events.SyncCommitApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.Node;
@@ -99,6 +100,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -1131,6 +1133,144 @@ public class AsyncKafkaConsumerTest {
         assertEquals(expectedGroupMetadata, secondActualGroupMetadataWithoutUpdate);
     }
 
+    @Test
+    public void testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsedWithTopics() {
+        testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsed(() -> {
+            consumer.subscribe(singletonList("topic"));
+        });
+    }
+
+    @Test
+    public void testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsedWithPattern() {
+        testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsed(() -> {
+            when(metadata.fetch()).thenReturn(Cluster.empty());
+            consumer.subscribe(Pattern.compile("topic"));
+        });
+    }
+
+    private void testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsed(final Runnable subscription) {
+        final String groupId = "consumerGroupA";
+        final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+        consumer = newConsumer(config);
+        subscription.run();
+
+        consumer.poll(Duration.ZERO);
+
+        verify(fetchCollector, never()).collectFetch(any(FetchBuffer.class));
+    }
+
+    @Test
+    public void testPollReturningRecordsIfGroupIdSetAndGroupManagementIsNotUsed() {
+        final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId("consumerGroupA"));
+        testPollReturningRecordsIfGroupMetadataHasUnknownGenerationAndGroupManagementIsNotUsed(config);
+    }
+
+    @Test
+    public void testPollReturningRecordsIfGroupIdNotSetAndGroupManagementIsNotUsed() {
+        final ConsumerConfig config = new ConsumerConfig(requiredConsumerProperties());
+        testPollReturningRecordsIfGroupMetadataHasUnknownGenerationAndGroupManagementIsNotUsed(config);
+    }
+
+    private void testPollReturningRecordsIfGroupMetadataHasUnknownGenerationAndGroupManagementIsNotUsed(final ConsumerConfig config) {
+        final String topic = "topic";
+        final TopicPartition topicPartition = new TopicPartition(topic, 0);
+        consumer = newConsumer(config);
+        consumer.assign(singletonList(topicPartition));
+        final List<ConsumerRecord<String, String>> records = singletonList(
+            new ConsumerRecord<>(topic, 0, 2, "key1", "value1")
+        );
+        when(fetchCollector.collectFetch(any(FetchBuffer.class)))
+            .thenReturn(Fetch.forPartition(topicPartition, records, true));
+        completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap());
+
+        consumer.poll(Duration.ZERO);
+
+        verify(fetchCollector).collectFetch(any(FetchBuffer.class));
+    }
+
+    @Test
+    public void testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsedWithTopics() {
+        final String topic = "topic";
+        testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsed(
+            topic,
+            () -> {
+                consumer.subscribe(singletonList(topic));
+            });
+    }
+
+    @Test
+    public void testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsedWithPattern() {
+        final String topic = "topic";
+        testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsed(
+            topic,
+            () -> {
+                when(metadata.fetch()).thenReturn(Cluster.empty());
+                consumer.subscribe(Pattern.compile(topic));
+            });
+    }
+
+    private void testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsed(final String topic,
+                                                                                  final Runnable subscription) {
+        final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId("consumerGroupA"));
+        final int generation = 1;
+        final String memberId = "newMemberId";
+        final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new GroupMetadataUpdateEvent(
+            generation,
+            memberId
+        );
+        backgroundEventQueue.add(groupMetadataUpdateEvent);
+        final TopicPartition topicPartition = new TopicPartition(topic, 0);
+        final List<ConsumerRecord<String, String>> records = singletonList(
+            new ConsumerRecord<>(topic, 0, 2, "key1", "value1")
+        );
+        when(fetchCollector.collectFetch(any(FetchBuffer.class)))
+            .thenReturn(Fetch.forPartition(topicPartition, records, true));
+        consumer = newConsumer(config);
+        subscription.run();
+
+        consumer.poll(Duration.ZERO);
+
+        verify(fetchCollector).collectFetch(any(FetchBuffer.class));
+    }
+
+    @Test
+    public void testGroupMetadataIsResetAfterUnsubscribe() {
+        final String groupId = "consumerGroupA";
+        final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+        consumer = newConsumer(config);
+        consumer.subscribe(singletonList("topic"));
+        final int generation = 1;
+        final String memberId = "newMemberId";
+        final ConsumerGroupMetadata groupMetadataAfterSubscription = new ConsumerGroupMetadata(
+            groupId,
+            generation,
+            memberId,
+            Optional.empty()
+        );
+        final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new GroupMetadataUpdateEvent(
+            generation,
+            memberId
+        );
+        backgroundEventQueue.add(groupMetadataUpdateEvent);
+        when(fetchCollector.collectFetch(any(FetchBuffer.class))).thenReturn(Fetch.empty());
+        consumer.poll(Duration.ZERO);
+
+        assertEquals(groupMetadataAfterSubscription, consumer.groupMetadata());
+
+        completeUnsubscribeApplicationEventSuccessfully();
+
+        consumer.unsubscribe();
+
+        final ConsumerGroupMetadata groupMetadataAfterUnsubscription = new ConsumerGroupMetadata(
+            groupId,
+            JoinGroupRequest.UNKNOWN_GENERATION_ID,
+            JoinGroupRequest.UNKNOWN_MEMBER_ID,
+            Optional.empty()
+        );
+
+        assertEquals(groupMetadataAfterUnsubscription, consumer.groupMetadata());
+    }
+
     /**
      * Tests that the consumer correctly invokes the callbacks for {@link ConsumerRebalanceListener} that was
      * specified. We don't go through the full effort to emulate heartbeats and correct group management here. We're
@@ -1321,6 +1461,7 @@ public class AsyncKafkaConsumerTest {
         final TopicPartition tp = new TopicPartition("topic", 0);
         final List<ConsumerRecord<String, String>> records = singletonList(
                 new ConsumerRecord<>("topic", 0, 2, "key1", "value1"));
+        backgroundEventQueue.add(new GroupMetadataUpdateEvent(1, "memberId"));
         doAnswer(invocation -> Fetch.forPartition(tp, records, true))
                 .when(fetchCollector)
                 .collectFetch(Mockito.any(FetchBuffer.class));
@@ -1416,6 +1557,13 @@ public class AsyncKafkaConsumerTest {
             new ConsumerRecord<>(topicName, partition, 3, "key2", "value2")
         );
 
+        final int generation = 1;
+        final String memberId = "newMemberId";
+        final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new GroupMetadataUpdateEvent(
+            generation,
+            memberId
+        );
+        backgroundEventQueue.add(groupMetadataUpdateEvent);
         // On the first iteration, return no data; on the second, return two records
         doAnswer(invocation -> {
             // Mock the subscription being assigned as the first fetch is collected