You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/05/29 16:38:06 UTC

[kafka] branch 2.4 updated: KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (#8739)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 7dcfdcb  KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (#8739)
7dcfdcb is described below

commit 7dcfdcb70f3e90dac574ff4c879e7bd8592dd795
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri May 29 17:03:49 2020 +0100

    KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (#8739)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/SubscriptionState.java      | 12 ++++++-
 .../internals/ConsumerCoordinatorTest.java         | 42 ++++++++++++++++++++++
 .../consumer/internals/SubscriptionStateTest.java  |  6 ++++
 3 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index cdf358e..1102aca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -339,7 +339,17 @@ public class SubscriptionState {
      *   of the current generation; otherwise it returns the same set as {@link #subscription()}
      */
     synchronized Set<String> metadataTopics() {
-        return groupSubscription.isEmpty() ? subscription : groupSubscription;
+        if (groupSubscription.isEmpty())
+            return subscription;
+        else if (groupSubscription.containsAll(subscription))
+            return groupSubscription;
+        else {
+            // When subscription changes `groupSubscription` may be outdated, ensure that
+            // new subscription topics are returned.
+            Set<String> topics = new HashSet<>(groupSubscription);
+            topics.addAll(subscription);
+            return topics;
+        }
     }
 
     synchronized boolean needsMetadata(String topic) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index afef951..6d818df 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -644,6 +644,35 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testMetadataTopicsDuringSubscriptionChange() {
+        final String consumerId = "subscription_change";
+        final List<String> oldSubscription = singletonList(topic1);
+        final List<TopicPartition> oldAssignment = Collections.singletonList(t1p);
+        final List<String> newSubscription = singletonList(topic2);
+        final List<TopicPartition> newAssignment = Collections.singletonList(t2p);
+
+        subscriptions.subscribe(toSet(oldSubscription), rebalanceListener);
+        assertEquals(toSet(oldSubscription), subscriptions.metadataTopics());
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        prepareJoinAndSyncResponse(consumerId, 1, oldSubscription, oldAssignment);
+
+        coordinator.poll(time.timer(0));
+        assertEquals(toSet(oldSubscription), subscriptions.metadataTopics());
+
+        subscriptions.subscribe(toSet(newSubscription), rebalanceListener);
+        assertEquals(Utils.mkSet(topic1, topic2), subscriptions.metadataTopics());
+
+        prepareJoinAndSyncResponse(consumerId, 2, newSubscription, newAssignment);
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(toSet(newAssignment), subscriptions.assignedPartitions());
+        assertEquals(toSet(newSubscription), subscriptions.metadataTopics());
+    }
+
+    @Test
     public void testPatternJoinGroupLeader() {
         final String consumerId = "leader";
         final List<TopicPartition> assigned = Arrays.asList(t1p, t2p);
@@ -2726,6 +2755,19 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetCommitRequestMatcher(expectedOffsets), offsetCommitResponse(errors), disconnected);
     }
 
+    private void prepareJoinAndSyncResponse(String consumerId, int generation, List<String> subscription, List<TopicPartition> assignment) {
+        partitionAssignor.prepare(singletonMap(consumerId, assignment));
+        client.prepareResponse(
+                joinGroupLeaderResponse(
+                        generation, consumerId, singletonMap(consumerId, subscription), Errors.NONE));
+        client.prepareResponse(body -> {
+            SyncGroupRequest sync = (SyncGroupRequest) body;
+            return sync.data.memberId().equals(consumerId) &&
+                    sync.data.generationId() == generation &&
+                    sync.groupAssignments().containsKey(consumerId);
+        }, syncGroupResponse(assignment, Errors.NONE));
+    }
+
     private Map<TopicPartition, Errors> partitionErrors(Collection<TopicPartition> partitions, Errors error) {
         final Map<TopicPartition, Errors> errors = new HashMap<>();
         for (TopicPartition partition : partitions) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 47d654e..58d8b88 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -119,6 +119,12 @@ public class SubscriptionStateTest {
         // `groupSubscribe` does not accumulate
         assertFalse(state.groupSubscribe(singleton(topic1)));
         assertEquals(singleton(topic1), state.metadataTopics());
+
+        state.subscribe(singleton("anotherTopic"), rebalanceListener);
+        assertEquals(Utils.mkSet(topic1, "anotherTopic"), state.metadataTopics());
+
+        assertFalse(state.groupSubscribe(singleton("anotherTopic")));
+        assertEquals(singleton("anotherTopic"), state.metadataTopics());
     }
 
     @Test