You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/02/11 18:12:17 UTC
[kafka] branch 2.3 updated: KAFKA-9181;
Maintain clean separation between local and group subscriptions in
consumer's SubscriptionState (#7941) (#8084)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new c7015ac KAFKA-9181; Maintain clean separation between local and group subscriptions in consumer's SubscriptionState (#7941) (#8084)
c7015ac is described below
commit c7015ac991719f970094720a6f3060d7d2abd002
Author: Vikas Singh <vi...@confluent.io>
AuthorDate: Tue Feb 11 10:11:31 2020 -0800
KAFKA-9181; Maintain clean separation between local and group subscriptions in consumer's SubscriptionState (#7941) (#8084)
Reviewers: Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wa...@gmail.com>
(cherry picked from commit a565d1a182cc69c9994c4512b5e9877e97f06cdf)
Co-authored-by: Rajini Sivaram <ra...@googlemail.com>
---
checkstyle/suppressions.xml | 2 +-
.../consumer/internals/ConsumerCoordinator.java | 2 +-
.../consumer/internals/ConsumerMetadata.java | 4 +-
.../consumer/internals/SubscriptionState.java | 22 ++++------
.../internals/ConsumerCoordinatorTest.java | 48 ++++++++++++++++++++--
.../consumer/internals/ConsumerMetadataTest.java | 5 ++-
6 files changed, 60 insertions(+), 23 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6b1cccc..e064723 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -54,7 +54,7 @@
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|SchemaGenerator|AbstractCoordinator).java"/>
<suppress checks="JavaNCSS"
- files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java|SenderTest.java"/>
+ files="(AbstractRequest|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest).java"/>
<suppress checks="NPathComplexity"
files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 876a51d..7ed4086 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1033,7 +1033,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private MetadataSnapshot(SubscriptionState subscription, Cluster cluster, int version) {
Map<String, Integer> partitionsPerTopic = new HashMap<>();
- for (String topic : subscription.groupSubscription()) {
+ for (String topic : subscription.metadataTopics()) {
Integer numPartitions = cluster.partitionCountForTopic(topic);
if (numPartitions != null)
partitionsPerTopic.put(topic, numPartitions);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
index fbdf1c6..ef7d924 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
@@ -55,7 +55,7 @@ public class ConsumerMetadata extends Metadata {
if (subscription.hasPatternSubscription())
return MetadataRequest.Builder.allTopics();
List<String> topics = new ArrayList<>();
- topics.addAll(subscription.groupSubscription());
+ topics.addAll(subscription.metadataTopics());
topics.addAll(transientTopics);
return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
}
@@ -72,7 +72,7 @@ public class ConsumerMetadata extends Metadata {
@Override
protected synchronized boolean retainTopic(String topic, boolean isInternal, long nowMs) {
- if (transientTopics.contains(topic) || subscription.isGroupSubscribed(topic))
+ if (transientTopics.contains(topic) || subscription.needsMetadata(topic))
return true;
if (isInternal && !includeInternalTopics)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index ae15f2f..f4f4d08 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -156,12 +156,6 @@ public class SubscriptionState {
return false;
subscription = topicsToSubscribe;
- if (subscriptionType != SubscriptionType.USER_ASSIGNED) {
- groupSubscription = new HashSet<>(groupSubscription);
- groupSubscription.addAll(topicsToSubscribe);
- } else {
- groupSubscription = new HashSet<>(topicsToSubscribe);
- }
return true;
}
@@ -181,7 +175,7 @@ public class SubscriptionState {
* Reset the group's subscription to only contain topics subscribed by this consumer.
*/
synchronized void resetGroupSubscription() {
- groupSubscription = subscription;
+ groupSubscription = Collections.emptySet();
}
/**
@@ -299,9 +293,9 @@ public class SubscriptionState {
}
/**
- * Get the subscription for the group. For the leader, this will include the union of the
- * subscriptions of all group members. For followers, it is just that member's subscription.
- * This is used when querying topic metadata to detect the metadata changes which would
+ * Get the subcription topics for which metadata is required . For the leader, this will include
+ * the union of the subscriptions of all group members. For followers, it is just that member's
+ * subscription. This is used when querying topic metadata to detect the metadata changes which would
* require rebalancing. The leader fetches metadata for all topics in the group so that it
* can do the partition assignment (which requires at least partition counts for all topics
* to be assigned).
@@ -309,12 +303,12 @@ public class SubscriptionState {
* @return The union of all subscribed topics in the group if this member is the leader
* of the current generation; otherwise it returns the same set as {@link #subscription()}
*/
- synchronized Set<String> groupSubscription() {
- return this.groupSubscription;
+ synchronized Set<String> metadataTopics() {
+ return groupSubscription.isEmpty() ? subscription : groupSubscription;
}
- synchronized boolean isGroupSubscribed(String topic) {
- return groupSubscription.contains(topic);
+ synchronized boolean needsMetadata(String topic) {
+ return subscription.contains(topic) || groupSubscription.contains(topic);
}
private TopicPartitionState assignedState(TopicPartition tp) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index b9e8883..b952354 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -430,7 +430,7 @@ public class ConsumerCoordinatorTest {
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(singleton(t1p), subscriptions.assignedPartitions());
- assertEquals(singleton(topic1), subscriptions.groupSubscription());
+ assertEquals(singleton(topic1), subscriptions.metadataTopics());
assertEquals(1, rebalanceListener.revokedCount);
assertEquals(Collections.emptySet(), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
@@ -488,7 +488,7 @@ public class ConsumerCoordinatorTest {
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(singleton(t1p), subscriptions.assignedPartitions());
- assertEquals(singleton(topic1), subscriptions.groupSubscription());
+ assertEquals(singleton(topic1), subscriptions.metadataTopics());
assertEquals(2, rebalanceListener.revokedCount);
assertEquals(Collections.emptySet(), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
@@ -555,7 +555,7 @@ public class ConsumerCoordinatorTest {
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(2, subscriptions.numAssignedPartitions());
- assertEquals(2, subscriptions.groupSubscription().size());
+ assertEquals(2, subscriptions.metadataTopics().size());
assertEquals(2, subscriptions.subscription().size());
assertEquals(1, rebalanceListener.revokedCount);
assertEquals(Collections.emptySet(), rebalanceListener.revoked);
@@ -737,7 +737,7 @@ public class ConsumerCoordinatorTest {
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(singleton(t1p), subscriptions.assignedPartitions());
- assertEquals(singleton(topic1), subscriptions.groupSubscription());
+ assertEquals(singleton(topic1), subscriptions.metadataTopics());
assertEquals(1, rebalanceListener.revokedCount);
assertEquals(Collections.emptySet(), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
@@ -1059,6 +1059,7 @@ public class ConsumerCoordinatorTest {
coordinator.poll(time.timer(Long.MAX_VALUE));
// the metadata update should trigger a second rebalance
+ subscriptions.groupSubscribe(topics);
client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE));
@@ -1068,6 +1069,45 @@ public class ConsumerCoordinatorTest {
assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
}
+ /**
+ * Verifies that subscription change updates SubscriptionState correctly even after JoinGroup failures
+ * that don't re-invoke onJoinPrepare.
+ */
+ @Test
+ public void testSubscriptionChangeWithAuthorizationFailure() {
+ final String consumerId = "consumer";
+
+ // Subscribe to two topics of which only one is authorized and verify that metadata failure is propagated.
+ subscriptions.subscribe(Utils.mkSet(topic1, topic2), rebalanceListener);
+ client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+ Collections.singletonMap(topic2, Errors.TOPIC_AUTHORIZATION_FAILED), singletonMap(topic1, 1)));
+ assertThrows(TopicAuthorizationException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));
+
+ client.respond(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+ // Fail the first JoinGroup request
+ client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.emptyMap(),
+ Errors.GROUP_AUTHORIZATION_FAILED));
+ assertThrows(GroupAuthorizationException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));
+
+ // Change subscription to include only the authorized topic. Complete rebalance and check that
+ // references to topic2 have been removed from SubscriptionState.
+ subscriptions.subscribe(Utils.mkSet(topic1), rebalanceListener);
+ assertEquals(Collections.singleton(topic1), subscriptions.metadataTopics());
+ client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+ Collections.emptyMap(), singletonMap(topic1, 1)));
+
+ Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+ partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(t1p)));
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
+ client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+
+ assertEquals(singleton(topic1), subscriptions.subscription());
+ assertEquals(singleton(topic1), subscriptions.metadataTopics());
+ }
+
@Test
public void testWakeupFromAssignmentCallback() {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index 33d102d..b373192 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -102,8 +102,11 @@ public class ConsumerMetadataTest {
@Test
public void testNormalSubscription() {
subscription.subscribe(Utils.mkSet("foo", "bar", "__consumer_offsets"), new NoOpConsumerRebalanceListener());
- subscription.groupSubscribe(Utils.mkSet("baz"));
+ subscription.groupSubscribe(Utils.mkSet("baz", "foo", "bar", "__consumer_offsets"));
testBasicSubscription(Utils.mkSet("foo", "bar", "baz"), Utils.mkSet("__consumer_offsets"));
+
+ subscription.resetGroupSubscription();
+ testBasicSubscription(Utils.mkSet("foo", "bar"), Utils.mkSet("__consumer_offsets"));
}
@Test