You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/05/29 21:14:49 UTC

[kafka] 02/02: KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (#8739)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit dddd73fb2efdc2a0b1fa71c9946406b12886c1bf
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri May 29 17:03:49 2020 +0100

    KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (#8739)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/SubscriptionState.java      | 12 ++++++-
 .../internals/ConsumerCoordinatorTest.java         | 42 ++++++++++++++++++++++
 .../consumer/internals/SubscriptionStateTest.java  |  6 ++++
 3 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 4d93a1b..86f4e68 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -345,7 +345,17 @@ public class SubscriptionState {
      *   of the current generation; otherwise it returns the same set as {@link #subscription()}
      */
     synchronized Set<String> metadataTopics() {
-        return groupSubscription.isEmpty() ? subscription : groupSubscription;
+        if (groupSubscription.isEmpty())
+            return subscription;
+        else if (groupSubscription.containsAll(subscription))
+            return groupSubscription;
+        else {
+            // When subscription changes `groupSubscription` may be outdated, ensure that
+            // new subscription topics are returned.
+            Set<String> topics = new HashSet<>(groupSubscription);
+            topics.addAll(subscription);
+            return topics;
+        }
     }
 
     synchronized boolean needsMetadata(String topic) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index afba13b..d61010e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -783,6 +783,35 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testMetadataTopicsDuringSubscriptionChange() {
+        final String consumerId = "subscription_change";
+        final List<String> oldSubscription = singletonList(topic1);
+        final List<TopicPartition> oldAssignment = Collections.singletonList(t1p);
+        final List<String> newSubscription = singletonList(topic2);
+        final List<TopicPartition> newAssignment = Collections.singletonList(t2p);
+
+        subscriptions.subscribe(toSet(oldSubscription), rebalanceListener);
+        assertEquals(toSet(oldSubscription), subscriptions.metadataTopics());
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        prepareJoinAndSyncResponse(consumerId, 1, oldSubscription, oldAssignment);
+
+        coordinator.poll(time.timer(0));
+        assertEquals(toSet(oldSubscription), subscriptions.metadataTopics());
+
+        subscriptions.subscribe(toSet(newSubscription), rebalanceListener);
+        assertEquals(Utils.mkSet(topic1, topic2), subscriptions.metadataTopics());
+
+        prepareJoinAndSyncResponse(consumerId, 2, newSubscription, newAssignment);
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(toSet(newAssignment), subscriptions.assignedPartitions());
+        assertEquals(toSet(newSubscription), subscriptions.metadataTopics());
+    }
+
+    @Test
     public void testPatternJoinGroupLeader() {
         final String consumerId = "leader";
         final List<TopicPartition> assigned = Arrays.asList(t1p, t2p);
@@ -3107,6 +3136,19 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetCommitRequestMatcher(expectedOffsets), offsetCommitResponse(errors), disconnected);
     }
 
+    private void prepareJoinAndSyncResponse(String consumerId, int generation, List<String> subscription, List<TopicPartition> assignment) {
+        partitionAssignor.prepare(singletonMap(consumerId, assignment));
+        client.prepareResponse(
+                joinGroupLeaderResponse(
+                        generation, consumerId, singletonMap(consumerId, subscription), Errors.NONE));
+        client.prepareResponse(body -> {
+            SyncGroupRequest sync = (SyncGroupRequest) body;
+            return sync.data.memberId().equals(consumerId) &&
+                    sync.data.generationId() == generation &&
+                    sync.groupAssignments().containsKey(consumerId);
+        }, syncGroupResponse(assignment, Errors.NONE));
+    }
+
     private Map<TopicPartition, Errors> partitionErrors(Collection<TopicPartition> partitions, Errors error) {
         final Map<TopicPartition, Errors> errors = new HashMap<>();
         for (TopicPartition partition : partitions) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index cc74652..ae00b8e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -122,6 +122,12 @@ public class SubscriptionStateTest {
         // `groupSubscribe` does not accumulate
         assertFalse(state.groupSubscribe(singleton(topic1)));
         assertEquals(singleton(topic1), state.metadataTopics());
+
+        state.subscribe(singleton("anotherTopic"), rebalanceListener);
+        assertEquals(Utils.mkSet(topic1, "anotherTopic"), state.metadataTopics());
+
+        assertFalse(state.groupSubscribe(singleton("anotherTopic")));
+        assertEquals(singleton("anotherTopic"), state.metadataTopics());
     }
 
     @Test