You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/05/29 21:14:49 UTC
[kafka] 02/02: KAFKA-10056;
Ensure consumer metadata contains new topics on subscription change
(#8739)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit dddd73fb2efdc2a0b1fa71c9946406b12886c1bf
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri May 29 17:03:49 2020 +0100
KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (#8739)
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../consumer/internals/SubscriptionState.java | 12 ++++++-
.../internals/ConsumerCoordinatorTest.java | 42 ++++++++++++++++++++++
.../consumer/internals/SubscriptionStateTest.java | 6 ++++
3 files changed, 59 insertions(+), 1 deletion(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 4d93a1b..86f4e68 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -345,7 +345,17 @@ public class SubscriptionState {
* of the current generation; otherwise it returns the same set as {@link #subscription()}
*/
synchronized Set<String> metadataTopics() {
- return groupSubscription.isEmpty() ? subscription : groupSubscription;
+ if (groupSubscription.isEmpty())
+ return subscription;
+ else if (groupSubscription.containsAll(subscription))
+ return groupSubscription;
+ else {
+ // When subscription changes `groupSubscription` may be outdated, ensure that
+ // new subscription topics are returned.
+ Set<String> topics = new HashSet<>(groupSubscription);
+ topics.addAll(subscription);
+ return topics;
+ }
}
synchronized boolean needsMetadata(String topic) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index afba13b..d61010e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -783,6 +783,35 @@ public class ConsumerCoordinatorTest {
}
@Test
+ public void testMetadataTopicsDuringSubscriptionChange() {
+ final String consumerId = "subscription_change";
+ final List<String> oldSubscription = singletonList(topic1);
+ final List<TopicPartition> oldAssignment = Collections.singletonList(t1p);
+ final List<String> newSubscription = singletonList(topic2);
+ final List<TopicPartition> newAssignment = Collections.singletonList(t2p);
+
+ subscriptions.subscribe(toSet(oldSubscription), rebalanceListener);
+ assertEquals(toSet(oldSubscription), subscriptions.metadataTopics());
+
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+ prepareJoinAndSyncResponse(consumerId, 1, oldSubscription, oldAssignment);
+
+ coordinator.poll(time.timer(0));
+ assertEquals(toSet(oldSubscription), subscriptions.metadataTopics());
+
+ subscriptions.subscribe(toSet(newSubscription), rebalanceListener);
+ assertEquals(Utils.mkSet(topic1, topic2), subscriptions.metadataTopics());
+
+ prepareJoinAndSyncResponse(consumerId, 2, newSubscription, newAssignment);
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertFalse(coordinator.rejoinNeededOrPending());
+ assertEquals(toSet(newAssignment), subscriptions.assignedPartitions());
+ assertEquals(toSet(newSubscription), subscriptions.metadataTopics());
+ }
+
+ @Test
public void testPatternJoinGroupLeader() {
final String consumerId = "leader";
final List<TopicPartition> assigned = Arrays.asList(t1p, t2p);
@@ -3107,6 +3136,19 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(offsetCommitRequestMatcher(expectedOffsets), offsetCommitResponse(errors), disconnected);
}
+ private void prepareJoinAndSyncResponse(String consumerId, int generation, List<String> subscription, List<TopicPartition> assignment) {
+ partitionAssignor.prepare(singletonMap(consumerId, assignment));
+ client.prepareResponse(
+ joinGroupLeaderResponse(
+ generation, consumerId, singletonMap(consumerId, subscription), Errors.NONE));
+ client.prepareResponse(body -> {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == generation &&
+ sync.groupAssignments().containsKey(consumerId);
+ }, syncGroupResponse(assignment, Errors.NONE));
+ }
+
private Map<TopicPartition, Errors> partitionErrors(Collection<TopicPartition> partitions, Errors error) {
final Map<TopicPartition, Errors> errors = new HashMap<>();
for (TopicPartition partition : partitions) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index cc74652..ae00b8e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -122,6 +122,12 @@ public class SubscriptionStateTest {
// `groupSubscribe` does not accumulate
assertFalse(state.groupSubscribe(singleton(topic1)));
assertEquals(singleton(topic1), state.metadataTopics());
+
+ state.subscribe(singleton("anotherTopic"), rebalanceListener);
+ assertEquals(Utils.mkSet(topic1, "anotherTopic"), state.metadataTopics());
+
+ assertFalse(state.groupSubscribe(singleton("anotherTopic")));
+ assertEquals(singleton("anotherTopic"), state.metadataTopics());
}
@Test