You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/02/12 21:05:14 UTC

[kafka] branch 2.4 updated (62233f1 -> 1b0c10d)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a change to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 62233f1  KAFKA-9500: Fix FK Join Topology (#8015)
     new f670136  KAFKA-9181; Maintain clean separation between local and group subscriptions in consumer's SubscriptionState (#7941)
     new 1b0c10d  MINOR: Fix unnecessary metadata fetch before group assignment (#8095)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 checkstyle/suppressions.xml                        |  2 +-
 .../consumer/ConsumerPartitionAssignor.java        |  8 ++++
 .../consumer/internals/ConsumerCoordinator.java    |  5 +--
 .../consumer/internals/ConsumerMetadata.java       |  4 +-
 .../consumer/internals/SubscriptionState.java      | 32 +++++++--------
 .../internals/ConsumerCoordinatorTest.java         | 47 ++++++++++++++++++++--
 .../consumer/internals/ConsumerMetadataTest.java   |  5 ++-
 .../consumer/internals/SubscriptionStateTest.java  | 20 ++++++++-
 .../kafka/admin/ConsumerGroupCommandTest.scala     | 14 ++++---
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala | 33 +++++++++++----
 10 files changed, 125 insertions(+), 45 deletions(-)


[kafka] 02/02: MINOR: Fix unnecessary metadata fetch before group assignment (#8095)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 1b0c10d2bf0d9da95dfea230d8d839e5f3769006
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Feb 12 11:45:06 2020 -0800

    MINOR: Fix unnecessary metadata fetch before group assignment (#8095)
    
    The recent increase in the flakiness of one of the offset reset tests (KAFKA-9538) traces back to https://github.com/apache/kafka/pull/7941. After investigation, we found that following this patch, the consumer was sending an additional metadata request prior to performing the group assignment. This slight timing difference was enough to trigger the test failures. The problem turned out to be due to a bug in `SubscriptionState.groupSubscribe`, which no longer counted the local subscri [...]
    
    Without the fix, we saw 30-50% test failures locally. With it, I could no longer reproduce the failure. However, #6561 is probably still needed to improve the resilience of this test.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../consumer/ConsumerPartitionAssignor.java        |  8 ++++++
 .../consumer/internals/ConsumerCoordinator.java    |  3 --
 .../consumer/internals/SubscriptionState.java      | 12 ++++----
 .../consumer/internals/SubscriptionStateTest.java  | 20 +++++++++++--
 .../kafka/admin/ConsumerGroupCommandTest.scala     | 14 +++++----
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala | 33 ++++++++++++++++------
 6 files changed, 67 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index f9a4217..8708ea4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -154,6 +154,14 @@ public interface ConsumerPartitionAssignor {
         public ByteBuffer userData() {
             return userData;
         }
+
+        @Override
+        public String toString() {
+            return "Assignment(" +
+                    "partitions=" + partitions +
+                    (userData == null ? "" : ", userDataSize=" + userData.remaining()) +
+                    ')';
+        }
     }
 
     final class GroupSubscription {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index db8b255..b4d9fa5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1320,9 +1320,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             return version == other.version || partitionsPerTopic.equals(other.partitionsPerTopic);
         }
 
-        Map<String, Integer> partitionsPerTopic() {
-            return partitionsPerTopic;
-        }
     }
 
     private static class OffsetCommitCompletion {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 4a3d15b..cdf358e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -187,15 +187,17 @@ public class SubscriptionState {
     }
 
     /**
-     * Add topics to the current group subscription. This is used by the group leader to ensure
+     * Set the current group subscription. This is used by the group leader to ensure
      * that it receives metadata updates for all topics that the group is interested in.
-     * @param topics The topics to add to the group subscription
+     *
+     * @param topics All topics from the group subscription
+     * @return true if the group subscription contains topics which are not part of the local subscription
      */
     synchronized boolean groupSubscribe(Collection<String> topics) {
         if (!partitionsAutoAssigned())
             throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
-        groupSubscription = new HashSet<>(groupSubscription);
-        return groupSubscription.addAll(topics);
+        groupSubscription = new HashSet<>(topics);
+        return !subscription.containsAll(groupSubscription);
     }
 
     /**
@@ -326,7 +328,7 @@ public class SubscriptionState {
     }
 
     /**
-     * Get the subcription topics for which metadata is required . For the leader, this will include
+     * Get the subscription topics for which metadata is required. For the leader, this will include
      * the union of the subscriptions of all group members. For followers, it is just that member's
      * subscription. This is used when querying topic metadata to detect the metadata changes which would
      * require rebalancing. The leader fetches metadata for all topics in the group so that it
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index c3ce02a..47d654e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -106,13 +106,29 @@ public class SubscriptionStateTest {
     }
 
     @Test
+    public void testGroupSubscribe() {
+        state.subscribe(singleton(topic1), rebalanceListener);
+        assertEquals(singleton(topic1), state.metadataTopics());
+
+        assertFalse(state.groupSubscribe(singleton(topic1)));
+        assertEquals(singleton(topic1), state.metadataTopics());
+
+        assertTrue(state.groupSubscribe(Utils.mkSet(topic, topic1)));
+        assertEquals(Utils.mkSet(topic, topic1), state.metadataTopics());
+
+        // `groupSubscribe` does not accumulate
+        assertFalse(state.groupSubscribe(singleton(topic1)));
+        assertEquals(singleton(topic1), state.metadataTopics());
+    }
+
+    @Test
     public void partitionAssignmentChangeOnPatternSubscription() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
         // assigned partitions should remain unchanged
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
 
-        state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic)));
+        state.subscribeFromPattern(Collections.singleton(topic));
         // assigned partitions should remain unchanged
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
@@ -244,7 +260,7 @@ public class SubscriptionStateTest {
     @Test
     public void cantAssignPartitionForUnmatchedPattern() {
         state.subscribe(Pattern.compile(".*t"), rebalanceListener);
-        state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic)));
+        state.subscribeFromPattern(Collections.singleton(topic));
         assertFalse(state.checkAssignmentMatchedSubscription(Collections.singletonList(t1p0)));
     }
 
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index 97b638f..853b2ca 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -65,20 +65,24 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
   }
 
   def committedOffsets(topic: String = topic, group: String = group): Map[TopicPartition, Long] = {
-    val props = new Properties
-    props.put("bootstrap.servers", brokerList)
-    props.put("group.id", group)
-    val consumer = new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
+    val consumer = createNoAutoCommitConsumer(group)
     try {
       val partitions: Set[TopicPartition] = consumer.partitionsFor(topic)
         .asScala.toSet.map {partitionInfo : PartitionInfo => new TopicPartition(partitionInfo.topic, partitionInfo.partition)}
-
       consumer.committed(partitions.asJava).asScala.filter(_._2 != null).mapValues(_.offset()).toMap
     } finally {
       consumer.close()
     }
   }
 
+  def createNoAutoCommitConsumer(group: String): KafkaConsumer[String, String] = {
+    val props = new Properties
+    props.put("bootstrap.servers", brokerList)
+    props.put("group.id", group)
+    props.put("enable.auto.commit", "false")
+    new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
+  }
+
   def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
     val opts = new ConsumerGroupCommandOptions(args)
     val service = new ConsumerGroupService(opts, Map(AdminClientConfig.RETRIES_CONFIG -> Int.MaxValue.toString))
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 838444c..f9322b3 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -16,8 +16,6 @@ import java.io.{BufferedWriter, File, FileWriter}
 import java.text.{ParseException, SimpleDateFormat}
 import java.util.{Calendar, Date, Properties}
 
-import scala.collection.Seq
-
 import joptsimple.OptionException
 import kafka.admin.ConsumerGroupCommand.ConsumerGroupService
 import kafka.server.KafkaConfig
@@ -28,6 +26,9 @@ import org.apache.kafka.test
 import org.junit.Assert._
 import org.junit.Test
 
+import scala.collection.JavaConverters._
+import scala.collection.Seq
+
 class TimeConversionTests {
 
   @Test
@@ -462,12 +463,28 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     executor.shutdown()
   }
 
-  private def awaitConsumerProgress(topic: String = topic, group: String = group, count: Long): Unit = {
-    TestUtils.waitUntilTrue(() => {
-      val offsets = committedOffsets(topic = topic, group = group).values
-      count == offsets.sum
-    }, "Expected that consumer group has consumed all messages from topic/partition. " +
-      s"Expected offset: $count. Actual offset: ${committedOffsets(topic, group).values.sum}")
+  private def awaitConsumerProgress(topic: String = topic,
+                                    group: String = group,
+                                    count: Long): Unit = {
+    val consumer = createNoAutoCommitConsumer(group)
+    try {
+      val partitions = consumer.partitionsFor(topic).asScala.map { partitionInfo =>
+        new TopicPartition(partitionInfo.topic, partitionInfo.partition)
+      }.toSet
+
+      TestUtils.waitUntilTrue(() => {
+        val committed = consumer.committed(partitions.asJava).values.asScala
+        val total = committed.foldLeft(0L) { case (currentSum, offsetAndMetadata) =>
+          currentSum + Option(offsetAndMetadata).map(_.offset).getOrElse(0L)
+        }
+        total == count
+      }, "Expected that consumer group has consumed all messages from topic/partition. " +
+        s"Expected offset: $count. Actual offset: ${committedOffsets(topic, group).values.sum}")
+
+    } finally {
+      consumer.close()
+    }
+
   }
 
   private def resetAndAssertOffsets(args: Array[String],


[kafka] 01/02: KAFKA-9181; Maintain clean separation between local and group subscriptions in consumer's SubscriptionState (#7941)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f670136fd3bd2b41323bcf9480b48146c680c159
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Jan 24 10:38:21 2020 +0000

    KAFKA-9181; Maintain clean separation between local and group subscriptions in consumer's SubscriptionState (#7941)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../consumer/internals/ConsumerCoordinator.java    |  2 +-
 .../consumer/internals/ConsumerMetadata.java       |  4 +-
 .../consumer/internals/SubscriptionState.java      | 22 ++++------
 .../internals/ConsumerCoordinatorTest.java         | 47 ++++++++++++++++++++--
 .../consumer/internals/ConsumerMetadataTest.java   |  5 ++-
 6 files changed, 59 insertions(+), 23 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c4d8453..941dd1b 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -60,7 +60,7 @@
               files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator).java"/>
 
     <suppress checks="JavaNCSS"
-              files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java|SenderTest.java|KafkaAdminClient.java"/>
+              files="(AbstractRequest|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest).java"/>
 
     <suppress checks="NPathComplexity"
               files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3f8d81d..db8b255 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1307,7 +1307,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         private MetadataSnapshot(SubscriptionState subscription, Cluster cluster, int version) {
             Map<String, Integer> partitionsPerTopic = new HashMap<>();
-            for (String topic : subscription.groupSubscription()) {
+            for (String topic : subscription.metadataTopics()) {
                 Integer numPartitions = cluster.partitionCountForTopic(topic);
                 if (numPartitions != null)
                     partitionsPerTopic.put(topic, numPartitions);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
index fbdf1c6..ef7d924 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
@@ -55,7 +55,7 @@ public class ConsumerMetadata extends Metadata {
         if (subscription.hasPatternSubscription())
             return MetadataRequest.Builder.allTopics();
         List<String> topics = new ArrayList<>();
-        topics.addAll(subscription.groupSubscription());
+        topics.addAll(subscription.metadataTopics());
         topics.addAll(transientTopics);
         return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
     }
@@ -72,7 +72,7 @@ public class ConsumerMetadata extends Metadata {
 
     @Override
     protected synchronized boolean retainTopic(String topic, boolean isInternal, long nowMs) {
-        if (transientTopics.contains(topic) || subscription.isGroupSubscribed(topic))
+        if (transientTopics.contains(topic) || subscription.needsMetadata(topic))
             return true;
 
         if (isInternal && !includeInternalTopics)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 953505f..4a3d15b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -183,12 +183,6 @@ public class SubscriptionState {
             return false;
 
         subscription = topicsToSubscribe;
-        if (subscriptionType != SubscriptionType.USER_ASSIGNED) {
-            groupSubscription = new HashSet<>(groupSubscription);
-            groupSubscription.addAll(topicsToSubscribe);
-        } else {
-            groupSubscription = new HashSet<>(topicsToSubscribe);
-        }
         return true;
     }
 
@@ -208,7 +202,7 @@ public class SubscriptionState {
      * Reset the group's subscription to only contain topics subscribed by this consumer.
      */
     synchronized void resetGroupSubscription() {
-        groupSubscription = subscription;
+        groupSubscription = Collections.emptySet();
     }
 
     /**
@@ -332,9 +326,9 @@ public class SubscriptionState {
     }
 
     /**
-     * Get the subscription for the group. For the leader, this will include the union of the
-     * subscriptions of all group members. For followers, it is just that member's subscription.
-     * This is used when querying topic metadata to detect the metadata changes which would
+     * Get the subcription topics for which metadata is required . For the leader, this will include
+     * the union of the subscriptions of all group members. For followers, it is just that member's
+     * subscription. This is used when querying topic metadata to detect the metadata changes which would
      * require rebalancing. The leader fetches metadata for all topics in the group so that it
      * can do the partition assignment (which requires at least partition counts for all topics
      * to be assigned).
@@ -342,12 +336,12 @@ public class SubscriptionState {
      * @return The union of all subscribed topics in the group if this member is the leader
      *   of the current generation; otherwise it returns the same set as {@link #subscription()}
      */
-    synchronized Set<String> groupSubscription() {
-        return this.groupSubscription;
+    synchronized Set<String> metadataTopics() {
+        return groupSubscription.isEmpty() ? subscription : groupSubscription;
     }
 
-    synchronized boolean isGroupSubscribed(String topic) {
-        return groupSubscription.contains(topic);
+    synchronized boolean needsMetadata(String topic) {
+        return subscription.contains(topic) || groupSubscription.contains(topic);
     }
 
     private TopicPartitionState assignedState(TopicPartition tp) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 6617fa2..9e6128b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -578,7 +578,7 @@ public class ConsumerCoordinatorTest {
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(toSet(assigned), subscriptions.assignedPartitions());
-        assertEquals(subscription, subscriptions.groupSubscription());
+        assertEquals(subscription, subscriptions.metadataTopics());
         assertEquals(0, rebalanceListener.revokedCount);
         assertNull(rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
@@ -637,7 +637,7 @@ public class ConsumerCoordinatorTest {
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(toSet(newAssignment), subscriptions.assignedPartitions());
-        assertEquals(toSet(newSubscription), subscriptions.groupSubscription());
+        assertEquals(toSet(newSubscription), subscriptions.metadataTopics());
         assertEquals(protocol == EAGER ? 1 : 0, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
         assertEquals(assigned, rebalanceListener.assigned);
@@ -676,7 +676,7 @@ public class ConsumerCoordinatorTest {
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(2, subscriptions.numAssignedPartitions());
-        assertEquals(2, subscriptions.groupSubscription().size());
+        assertEquals(2, subscriptions.metadataTopics().size());
         assertEquals(2, subscriptions.subscription().size());
         // callback not triggered at all since there's nothing to be revoked
         assertEquals(0, rebalanceListener.revokedCount);
@@ -907,7 +907,7 @@ public class ConsumerCoordinatorTest {
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(toSet(assigned), subscriptions.assignedPartitions());
-        assertEquals(subscription, subscriptions.groupSubscription());
+        assertEquals(subscription, subscriptions.metadataTopics());
         assertEquals(0, rebalanceListener.revokedCount);
         assertNull(rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
@@ -1229,6 +1229,45 @@ public class ConsumerCoordinatorTest {
         assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
     }
 
+    /**
+     * Verifies that subscription change updates SubscriptionState correctly even after JoinGroup failures
+     * that don't re-invoke onJoinPrepare.
+     */
+    @Test
+    public void testSubscriptionChangeWithAuthorizationFailure() {
+        final String consumerId = "consumer";
+
+        // Subscribe to two topics of which only one is authorized and verify that metadata failure is propagated.
+        subscriptions.subscribe(Utils.mkSet(topic1, topic2), rebalanceListener);
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+                Collections.singletonMap(topic2, Errors.TOPIC_AUTHORIZATION_FAILED), singletonMap(topic1, 1)));
+        assertThrows(TopicAuthorizationException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));
+
+        client.respond(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Fail the first JoinGroup request
+        client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.emptyMap(),
+                Errors.GROUP_AUTHORIZATION_FAILED));
+        assertThrows(GroupAuthorizationException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));
+
+        // Change subscription to include only the authorized topic. Complete rebalance and check that
+        // references to topic2 have been removed from SubscriptionState.
+        subscriptions.subscribe(Utils.mkSet(topic1), rebalanceListener);
+        assertEquals(Collections.singleton(topic1), subscriptions.metadataTopics());
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+                Collections.emptyMap(), singletonMap(topic1, 1)));
+
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+        partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(t1p)));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+
+        assertEquals(singleton(topic1), subscriptions.subscription());
+        assertEquals(singleton(topic1), subscriptions.metadataTopics());
+    }
+
     @Test
     public void testWakeupFromAssignmentCallback() {
         final String topic = "topic1";
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index 33d102d..b373192 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -102,8 +102,11 @@ public class ConsumerMetadataTest {
     @Test
     public void testNormalSubscription() {
         subscription.subscribe(Utils.mkSet("foo", "bar", "__consumer_offsets"), new NoOpConsumerRebalanceListener());
-        subscription.groupSubscribe(Utils.mkSet("baz"));
+        subscription.groupSubscribe(Utils.mkSet("baz", "foo", "bar", "__consumer_offsets"));
         testBasicSubscription(Utils.mkSet("foo", "bar", "baz"), Utils.mkSet("__consumer_offsets"));
+
+        subscription.resetGroupSubscription();
+        testBasicSubscription(Utils.mkSet("foo", "bar"), Utils.mkSet("__consumer_offsets"));
     }
 
     @Test