You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/02/11 18:12:17 UTC

[kafka] branch 2.3 updated: KAFKA-9181; Maintain clean separation between local and group subscriptions in consumer's SubscriptionState (#7941) (#8084)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new c7015ac  KAFKA-9181; Maintain clean separation between local and group subscriptions in consumer's SubscriptionState (#7941) (#8084)
c7015ac is described below

commit c7015ac991719f970094720a6f3060d7d2abd002
Author: Vikas Singh <vi...@confluent.io>
AuthorDate: Tue Feb 11 10:11:31 2020 -0800

    KAFKA-9181; Maintain clean separation between local and group subscriptions in consumer's SubscriptionState (#7941) (#8084)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wa...@gmail.com>
    (cherry picked from commit a565d1a182cc69c9994c4512b5e9877e97f06cdf)
    
    Co-authored-by: Rajini Sivaram <ra...@googlemail.com>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../consumer/internals/ConsumerCoordinator.java    |  2 +-
 .../consumer/internals/ConsumerMetadata.java       |  4 +-
 .../consumer/internals/SubscriptionState.java      | 22 ++++------
 .../internals/ConsumerCoordinatorTest.java         | 48 ++++++++++++++++++++--
 .../consumer/internals/ConsumerMetadataTest.java   |  5 ++-
 6 files changed, 60 insertions(+), 23 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6b1cccc..e064723 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -54,7 +54,7 @@
               files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|SchemaGenerator|AbstractCoordinator).java"/>
 
     <suppress checks="JavaNCSS"
-              files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java|SenderTest.java"/>
+              files="(AbstractRequest|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest).java"/>
 
     <suppress checks="NPathComplexity"
               files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 876a51d..7ed4086 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1033,7 +1033,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         private MetadataSnapshot(SubscriptionState subscription, Cluster cluster, int version) {
             Map<String, Integer> partitionsPerTopic = new HashMap<>();
-            for (String topic : subscription.groupSubscription()) {
+            for (String topic : subscription.metadataTopics()) {
                 Integer numPartitions = cluster.partitionCountForTopic(topic);
                 if (numPartitions != null)
                     partitionsPerTopic.put(topic, numPartitions);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
index fbdf1c6..ef7d924 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
@@ -55,7 +55,7 @@ public class ConsumerMetadata extends Metadata {
         if (subscription.hasPatternSubscription())
             return MetadataRequest.Builder.allTopics();
         List<String> topics = new ArrayList<>();
-        topics.addAll(subscription.groupSubscription());
+        topics.addAll(subscription.metadataTopics());
         topics.addAll(transientTopics);
         return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
     }
@@ -72,7 +72,7 @@ public class ConsumerMetadata extends Metadata {
 
     @Override
     protected synchronized boolean retainTopic(String topic, boolean isInternal, long nowMs) {
-        if (transientTopics.contains(topic) || subscription.isGroupSubscribed(topic))
+        if (transientTopics.contains(topic) || subscription.needsMetadata(topic))
             return true;
 
         if (isInternal && !includeInternalTopics)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index ae15f2f..f4f4d08 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -156,12 +156,6 @@ public class SubscriptionState {
             return false;
 
         subscription = topicsToSubscribe;
-        if (subscriptionType != SubscriptionType.USER_ASSIGNED) {
-            groupSubscription = new HashSet<>(groupSubscription);
-            groupSubscription.addAll(topicsToSubscribe);
-        } else {
-            groupSubscription = new HashSet<>(topicsToSubscribe);
-        }
         return true;
     }
 
@@ -181,7 +175,7 @@ public class SubscriptionState {
      * Reset the group's subscription to only contain topics subscribed by this consumer.
      */
     synchronized void resetGroupSubscription() {
-        groupSubscription = subscription;
+        groupSubscription = Collections.emptySet();
     }
 
     /**
@@ -299,9 +293,9 @@ public class SubscriptionState {
     }
 
     /**
-     * Get the subscription for the group. For the leader, this will include the union of the
-     * subscriptions of all group members. For followers, it is just that member's subscription.
-     * This is used when querying topic metadata to detect the metadata changes which would
+     * Get the subcription topics for which metadata is required . For the leader, this will include
+     * the union of the subscriptions of all group members. For followers, it is just that member's
+     * subscription. This is used when querying topic metadata to detect the metadata changes which would
      * require rebalancing. The leader fetches metadata for all topics in the group so that it
      * can do the partition assignment (which requires at least partition counts for all topics
      * to be assigned).
@@ -309,12 +303,12 @@ public class SubscriptionState {
      * @return The union of all subscribed topics in the group if this member is the leader
      *   of the current generation; otherwise it returns the same set as {@link #subscription()}
      */
-    synchronized Set<String> groupSubscription() {
-        return this.groupSubscription;
+    synchronized Set<String> metadataTopics() {
+        return groupSubscription.isEmpty() ? subscription : groupSubscription;
     }
 
-    synchronized boolean isGroupSubscribed(String topic) {
-        return groupSubscription.contains(topic);
+    synchronized boolean needsMetadata(String topic) {
+        return subscription.contains(topic) || groupSubscription.contains(topic);
     }
 
     private TopicPartitionState assignedState(TopicPartition tp) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index b9e8883..b952354 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -430,7 +430,7 @@ public class ConsumerCoordinatorTest {
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
-        assertEquals(singleton(topic1), subscriptions.groupSubscription());
+        assertEquals(singleton(topic1), subscriptions.metadataTopics());
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(Collections.emptySet(), rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
@@ -488,7 +488,7 @@ public class ConsumerCoordinatorTest {
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
-        assertEquals(singleton(topic1), subscriptions.groupSubscription());
+        assertEquals(singleton(topic1), subscriptions.metadataTopics());
         assertEquals(2, rebalanceListener.revokedCount);
         assertEquals(Collections.emptySet(), rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
@@ -555,7 +555,7 @@ public class ConsumerCoordinatorTest {
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(2, subscriptions.numAssignedPartitions());
-        assertEquals(2, subscriptions.groupSubscription().size());
+        assertEquals(2, subscriptions.metadataTopics().size());
         assertEquals(2, subscriptions.subscription().size());
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(Collections.emptySet(), rebalanceListener.revoked);
@@ -737,7 +737,7 @@ public class ConsumerCoordinatorTest {
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
-        assertEquals(singleton(topic1), subscriptions.groupSubscription());
+        assertEquals(singleton(topic1), subscriptions.metadataTopics());
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(Collections.emptySet(), rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
@@ -1059,6 +1059,7 @@ public class ConsumerCoordinatorTest {
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
         // the metadata update should trigger a second rebalance
+        subscriptions.groupSubscribe(topics);
         client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE));
 
@@ -1068,6 +1069,45 @@ public class ConsumerCoordinatorTest {
         assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
     }
 
+    /**
+     * Verifies that subscription change updates SubscriptionState correctly even after JoinGroup failures
+     * that don't re-invoke onJoinPrepare.
+     */
+    @Test
+    public void testSubscriptionChangeWithAuthorizationFailure() {
+        final String consumerId = "consumer";
+
+        // Subscribe to two topics of which only one is authorized and verify that metadata failure is propagated.
+        subscriptions.subscribe(Utils.mkSet(topic1, topic2), rebalanceListener);
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+                Collections.singletonMap(topic2, Errors.TOPIC_AUTHORIZATION_FAILED), singletonMap(topic1, 1)));
+        assertThrows(TopicAuthorizationException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));
+
+        client.respond(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Fail the first JoinGroup request
+        client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.emptyMap(),
+                Errors.GROUP_AUTHORIZATION_FAILED));
+        assertThrows(GroupAuthorizationException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));
+
+        // Change subscription to include only the authorized topic. Complete rebalance and check that
+        // references to topic2 have been removed from SubscriptionState.
+        subscriptions.subscribe(Utils.mkSet(topic1), rebalanceListener);
+        assertEquals(Collections.singleton(topic1), subscriptions.metadataTopics());
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+                Collections.emptyMap(), singletonMap(topic1, 1)));
+
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+        partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(t1p)));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+
+        assertEquals(singleton(topic1), subscriptions.subscription());
+        assertEquals(singleton(topic1), subscriptions.metadataTopics());
+    }
+
     @Test
     public void testWakeupFromAssignmentCallback() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index 33d102d..b373192 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -102,8 +102,11 @@ public class ConsumerMetadataTest {
     @Test
     public void testNormalSubscription() {
         subscription.subscribe(Utils.mkSet("foo", "bar", "__consumer_offsets"), new NoOpConsumerRebalanceListener());
-        subscription.groupSubscribe(Utils.mkSet("baz"));
+        subscription.groupSubscribe(Utils.mkSet("baz", "foo", "bar", "__consumer_offsets"));
         testBasicSubscription(Utils.mkSet("foo", "bar", "baz"), Utils.mkSet("__consumer_offsets"));
+
+        subscription.resetGroupSubscription();
+        testBasicSubscription(Utils.mkSet("foo", "bar"), Utils.mkSet("__consumer_offsets"));
     }
 
     @Test