You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/20 04:59:58 UTC

kafka git commit: KAFKA-3949: Fix race condition when metadata update arrives during rebalance

Repository: kafka
Updated Branches:
  refs/heads/trunk c5d26c482 -> 317c4fded


KAFKA-3949: Fix race condition when metadata update arrives during rebalance

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Vahid Hashemian, Guozhang Wang

Closes #1762 from hachikuji/KAFKA-3949


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/317c4fde
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/317c4fde
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/317c4fde

Branch: refs/heads/trunk
Commit: 317c4fdede41f2026b34f473af1ad69f8ee62a1d
Parents: c5d26c4
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Aug 19 21:59:55 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Aug 19 21:59:55 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/Metadata.java |  13 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |  10 +-
 .../kafka/clients/consumer/MockConsumer.java    |   8 +-
 .../consumer/internals/AbstractCoordinator.java |  20 +-
 .../consumer/internals/ConsumerCoordinator.java |  59 ++--
 .../clients/consumer/internals/Fetcher.java     |  28 +-
 .../consumer/internals/SubscriptionState.java   | 100 +++----
 .../org/apache/kafka/clients/MockClient.java    |   2 +-
 .../internals/ConsumerCoordinatorTest.java      | 280 +++++++++++--------
 .../clients/consumer/internals/FetcherTest.java |  65 ++---
 .../internals/SubscriptionStateTest.java        |  77 +++--
 11 files changed, 354 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 0fd5d63..a4cf730 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -211,8 +211,15 @@ public final class Metadata {
         for (Listener listener: listeners)
             listener.onMetadataUpdate(cluster);
 
-        // Do this after notifying listeners as subscribed topics' list can be changed by listeners
-        this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;
+
+        if (this.needMetadataForAllTopics) {
+            // the listener may change the interested topics, which could cause another metadata refresh.
+            // If we have already fetched all topics, however, another fetch should be unnecessary.
+            this.needUpdate = false;
+            this.cluster = getClusterForCurrentTopics(cluster);
+        } else {
+            this.cluster = cluster;
+        }
 
         notifyAll();
         log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
@@ -287,7 +294,7 @@ public final class Metadata {
         Set<String> unauthorizedTopics = new HashSet<>();
         Collection<PartitionInfo> partitionInfos = new ArrayList<>();
         List<Node> nodes = Collections.emptyList();
-        Set<String> internalTopics = Collections.<String>emptySet();
+        Set<String> internalTopics = Collections.emptySet();
         if (cluster != null) {
             internalTopics = cluster.internalTopics();
             unauthorizedTopics.addAll(cluster.unauthorizedTopics());

http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ef91302..85d5194 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -801,7 +801,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                         throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
                 }
                 log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
-                this.subscriptions.subscribe(topics, listener);
+                this.subscriptions.subscribe(new HashSet<>(topics), listener);
                 metadata.setTopics(subscriptions.groupSubscription());
             }
         } finally {
@@ -914,7 +914,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 }
 
                 log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
-                this.subscriptions.assignFromUser(partitions);
+                this.subscriptions.assignFromUser(new HashSet<>(partitions));
                 metadata.setTopics(topics);
             }
         } finally {
@@ -1007,6 +1007,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
         long now = time.milliseconds();
         client.poll(Math.min(coordinator.timeToNextPoll(now), timeout), now);
+
+        // after the long poll, we should check whether the group needs to rebalance
+        // prior to returning data so that the group can stabilize faster
+        if (coordinator.needRejoin())
+            return Collections.emptyMap();
+
         return fetcher.fetchedRecords();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 9ab4c29..62eb77d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -94,26 +94,26 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     public void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) {
         ensureNotClosed();
         this.subscriptions.subscribe(pattern, listener);
-        List<String> topicsToSubscribe = new ArrayList<>();
+        Set<String> topicsToSubscribe = new HashSet<>();
         for (String topic: partitions.keySet()) {
             if (pattern.matcher(topic).matches() &&
                 !subscriptions.subscription().contains(topic))
                 topicsToSubscribe.add(topic);
         }
         ensureNotClosed();
-        this.subscriptions.changeSubscription(topicsToSubscribe);
+        this.subscriptions.subscribeFromPattern(topicsToSubscribe);
     }
 
     @Override
     public void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) {
         ensureNotClosed();
-        this.subscriptions.subscribe(topics, listener);
+        this.subscriptions.subscribe(new HashSet<>(topics), listener);
     }
 
     @Override
     public void assign(Collection<TopicPartition> partitions) {
         ensureNotClosed();
-        this.subscriptions.assignFromUser(partitions);
+        this.subscriptions.assignFromUser(new HashSet<>(partitions));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 690df26..bf6b920 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -270,16 +270,6 @@ public abstract class AbstractCoordinator implements Closeable {
         // when sending heartbeats and does not necessarily require us to rejoin the group.
         ensureCoordinatorReady();
 
-        if (!needRejoin())
-            return;
-
-        // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
-        // time if the client is woken up before a pending rebalance completes.
-        if (needsJoinPrepare) {
-            onJoinPrepare(generation.generationId, generation.memberId);
-            needsJoinPrepare = false;
-        }
-
         if (heartbeatThread == null) {
             heartbeatThread = new HeartbeatThread();
             heartbeatThread.start();
@@ -288,6 +278,16 @@ public abstract class AbstractCoordinator implements Closeable {
         while (needRejoin()) {
             ensureCoordinatorReady();
 
+            // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
+            // time if the client is woken up before a pending rebalance completes. This must be called
+            // on each iteration of the loop because an event requiring a rebalance (such as a metadata
+            // refresh which changes the matched subscription set) can occur while another rebalance is
+            // still in progress.
+            if (needsJoinPrepare) {
+                onJoinPrepare(generation.generationId, generation.memberId);
+                needsJoinPrepare = false;
+            }
+
             // ensure that there are no pending requests to the coordinator. This is important
             // in particular to avoid resending a pending JoinGroup request.
             if (client.pendingRequestCount(this.coordinator) > 0) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5fee45a..b8df50e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -78,6 +78,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     // of offset commit requests, which may be invoked from the heartbeat thread
     private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;
 
+    private boolean isLeader = false;
+    private Set<String> joinedSubscription;
     private MetadataSnapshot metadataSnapshot;
     private MetadataSnapshot assignmentSnapshot;
     private long nextAutoCommitDeadline;
@@ -137,9 +139,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     @Override
     public List<ProtocolMetadata> metadata() {
+        this.joinedSubscription = subscriptions.subscription();
         List<ProtocolMetadata> metadataList = new ArrayList<>();
         for (PartitionAssignor assignor : assignors) {
-            Subscription subscription = assignor.subscription(subscriptions.subscription());
+            Subscription subscription = assignor.subscription(joinedSubscription);
             ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
             metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
         }
@@ -155,26 +158,26 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
 
                 if (subscriptions.hasPatternSubscription()) {
-                    final List<String> topicsToSubscribe = new ArrayList<>();
+                    final Set<String> topicsToSubscribe = new HashSet<>();
 
                     for (String topic : cluster.topics())
                         if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
                                 !(excludeInternalTopics && cluster.internalTopics().contains(topic)))
                             topicsToSubscribe.add(topic);
 
-                    subscriptions.changeSubscription(topicsToSubscribe);
+                    subscriptions.subscribeFromPattern(topicsToSubscribe);
+
+                    // note we still need to update the topics contained in the metadata. Although we have
+                    // specified that all topics should be fetched, only those set explicitly will be retained
                     metadata.setTopics(subscriptions.groupSubscription());
                 }
 
                 // check if there are any changes to the metadata which should trigger a rebalance
                 if (subscriptions.partitionsAutoAssigned()) {
                     MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
-                    if (!snapshot.equals(metadataSnapshot)) {
+                    if (!snapshot.equals(metadataSnapshot))
                         metadataSnapshot = snapshot;
-                        subscriptions.needReassignment();
-                    }
                 }
-
             }
         });
     }
@@ -192,12 +195,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                   String memberId,
                                   String assignmentStrategy,
                                   ByteBuffer assignmentBuffer) {
-        // if we were the assignor, then we need to make sure that there have been no metadata updates
-        // since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
-        if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
-            subscriptions.needReassignment();
-            return;
-        }
+        // only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
+        if (!isLeader)
+            assignmentSnapshot = null;
 
         PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
         if (assignor == null)
@@ -246,13 +246,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             now = time.milliseconds();
         }
 
-        if (subscriptions.partitionsAutoAssigned() && needRejoin()) {
-            // due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that
-            // the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives
-            // while the rebalance is still pending (for example, when the join group is still inflight), then we will lose
-            // track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without
-            // ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a
-            // rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem.
+        if (needRejoin()) {
+            // due to a race condition between the initial metadata fetch and the initial rebalance,
+            // we need to ensure that the metadata is fresh before joining initially. This ensures
+            // that we have matched the pattern against the cluster's topics at least once before joining.
             if (subscriptions.hasPatternSubscription())
                 client.ensureFreshMetadata();
 
@@ -303,6 +300,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         // update metadata (if needed) and keep track of the metadata used for assignment so that
         // we can check after rebalance completion whether anything has changed
         client.ensureFreshMetadata();
+
+        isLeader = true;
         assignmentSnapshot = metadataSnapshot;
 
         log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
@@ -339,14 +338,24 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     listener.getClass().getName(), groupId, e);
         }
 
-        assignmentSnapshot = null;
-        subscriptions.needReassignment();
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
     }
 
     @Override
-    protected boolean needRejoin() {
-        return subscriptions.partitionsAutoAssigned() &&
-                (super.needRejoin() || subscriptions.partitionAssignmentNeeded());
+    public boolean needRejoin() {
+        if (!subscriptions.partitionsAutoAssigned())
+            return false;
+
+        // we need to rejoin if we performed the assignment and metadata has changed
+        if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
+            return true;
+
+        // we need to join if our subscription has changed since the last join
+        if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
+            return true;
+
+        return super.needRejoin();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 84278c6..aa5cdbe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -350,26 +350,22 @@ public class Fetcher<K, V> {
      *         the defaultResetPolicy is NONE
      */
     public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
-        if (this.subscriptions.partitionAssignmentNeeded()) {
-            return Collections.emptyMap();
-        } else {
-            Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
-            int recordsRemaining = maxPollRecords;
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
+        int recordsRemaining = maxPollRecords;
 
-            while (recordsRemaining > 0) {
-                if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
-                    CompletedFetch completedFetch = completedFetches.poll();
-                    if (completedFetch == null)
-                        break;
+        while (recordsRemaining > 0) {
+            if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
+                CompletedFetch completedFetch = completedFetches.poll();
+                if (completedFetch == null)
+                    break;
 
-                    nextInLineRecords = parseFetchedData(completedFetch);
-                } else {
-                    recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
-                }
+                nextInLineRecords = parseFetchedData(completedFetch);
+            } else {
+                recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
             }
-
-            return drained;
         }
+
+        return drained;
     }
 
     private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,

http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index e9b2eb2..6d4c01b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -46,6 +47,8 @@ import java.util.regex.Pattern;
  * to set the initial fetch position (e.g. {@link Fetcher#resetOffset(TopicPartition)}.
  */
 public class SubscriptionState {
+    private static final String SUBSCRIPTION_EXCEPTION_MESSAGE =
+            "Subscription to topics, partitions and pattern are mutually exclusive";
 
     private enum SubscriptionType {
         NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
@@ -58,20 +61,17 @@ public class SubscriptionState {
     private Pattern subscribedPattern;
 
     /* the list of topics the user has requested */
-    private final Set<String> subscription;
+    private Set<String> subscription;
+
+    /* the list of partitions the user has requested */
+    private Set<TopicPartition> userAssignment;
 
     /* the list of topics the group has subscribed to (set only for the leader on join group completion) */
     private final Set<String> groupSubscription;
 
-    /* the list of partitions the user has requested */
-    private final Set<TopicPartition> userAssignment;
-
     /* the list of partitions currently assigned */
     private final Map<TopicPartition, TopicPartitionState> assignment;
 
-    /* do we need to request a partition assignment from the coordinator? */
-    private boolean needsPartitionAssignment;
-
     /* do we need to request the latest committed offsets from the coordinator? */
     private boolean needsFetchCommittedOffsets;
 
@@ -81,8 +81,16 @@ public class SubscriptionState {
     /* Listener to be invoked when assignment changes */
     private ConsumerRebalanceListener listener;
 
-    private static final String SUBSCRIPTION_EXCEPTION_MESSAGE =
-        "Subscription to topics, partitions and pattern are mutually exclusive";
+    public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
+        this.defaultResetStrategy = defaultResetStrategy;
+        this.subscription = Collections.emptySet();
+        this.userAssignment = Collections.emptySet();
+        this.assignment = new HashMap<>();
+        this.groupSubscription = new HashSet<>();
+        this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
+        this.subscribedPattern = null;
+        this.subscriptionType = SubscriptionType.NONE;
+    }
 
     /**
      * This method sets the subscription type if it is not already set (i.e. when it is NONE),
@@ -97,19 +105,7 @@ public class SubscriptionState {
             throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
     }
 
-    public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
-        this.defaultResetStrategy = defaultResetStrategy;
-        this.subscription = new HashSet<>();
-        this.userAssignment = new HashSet<>();
-        this.assignment = new HashMap<>();
-        this.groupSubscription = new HashSet<>();
-        this.needsPartitionAssignment = false;
-        this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
-        this.subscribedPattern = null;
-        this.subscriptionType = SubscriptionType.NONE;
-    }
-
-    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
+    public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
         if (listener == null)
             throw new IllegalArgumentException("RebalanceListener cannot be null");
 
@@ -120,12 +116,18 @@ public class SubscriptionState {
         changeSubscription(topics);
     }
 
-    public void changeSubscription(Collection<String> topicsToSubscribe) {
-        if (!this.subscription.equals(new HashSet<>(topicsToSubscribe))) {
-            this.subscription.clear();
-            this.subscription.addAll(topicsToSubscribe);
+    public void subscribeFromPattern(Set<String> topics) {
+        if (subscriptionType != SubscriptionType.AUTO_PATTERN)
+            throw new IllegalArgumentException("Attempt to subscribe from pattern while subscription type set to " +
+                    subscriptionType);
+
+        changeSubscription(topics);
+    }
+
+    private void changeSubscription(Set<String> topicsToSubscribe) {
+        if (!this.subscription.equals(topicsToSubscribe)) {
+            this.subscription = topicsToSubscribe;
             this.groupSubscription.addAll(topicsToSubscribe);
-            this.needsPartitionAssignment = true;
 
             // Remove any assigned partitions which are no longer subscribed to
             for (Iterator<TopicPartition> it = assignment.keySet().iterator(); it.hasNext(); ) {
@@ -147,9 +149,11 @@ public class SubscriptionState {
         this.groupSubscription.addAll(topics);
     }
 
-    public void needReassignment() {
+    /**
+     * Reset the group's subscription to only contain topics subscribed by this consumer.
+     */
+    public void resetGroupSubscription() {
         this.groupSubscription.retainAll(subscription);
-        this.needsPartitionAssignment = true;
     }
 
     /**
@@ -157,34 +161,37 @@ public class SubscriptionState {
      * note this is different from {@link #assignFromSubscribed(Collection)}
      * whose input partitions are provided from the subscribed topics.
      */
-    public void assignFromUser(Collection<TopicPartition> partitions) {
+    public void assignFromUser(Set<TopicPartition> partitions) {
         setSubscriptionType(SubscriptionType.USER_ASSIGNED);
 
-        this.userAssignment.clear();
-        this.userAssignment.addAll(partitions);
+        if (!this.assignment.keySet().equals(partitions)) {
+            this.userAssignment = partitions;
 
-        for (TopicPartition partition : partitions)
-            if (!assignment.containsKey(partition))
-                addAssignedPartition(partition);
-
-        this.assignment.keySet().retainAll(this.userAssignment);
-
-        this.needsPartitionAssignment = false;
-        this.needsFetchCommittedOffsets = true;
+            for (TopicPartition partition : partitions)
+                if (!assignment.containsKey(partition))
+                    addAssignedPartition(partition);
+            this.assignment.keySet().retainAll(this.userAssignment);
+            this.needsFetchCommittedOffsets = true;
+        }
     }
 
     /**
      * Change the assignment to the specified partitions returned from the coordinator,
-     * note this is different from {@link #assignFromUser(Collection)} which directly set the assignment from user inputs
+     * note this is different from {@link #assignFromUser(Set)} which directly set the assignment from user inputs
      */
     public void assignFromSubscribed(Collection<TopicPartition> assignments) {
+        if (!this.partitionsAutoAssigned())
+            throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use");
+
         for (TopicPartition tp : assignments)
             if (!this.subscription.contains(tp.topic()))
                 throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
+
+        // after rebalancing, we always reinitialize the assignment state
         this.assignment.clear();
         for (TopicPartition tp: assignments)
             addAssignedPartition(tp);
-        this.needsPartitionAssignment = false;
+        this.needsFetchCommittedOffsets = true;
     }
 
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
@@ -202,10 +209,9 @@ public class SubscriptionState {
     }
 
     public void unsubscribe() {
-        this.subscription.clear();
-        this.userAssignment.clear();
+        this.subscription = Collections.emptySet();
+        this.userAssignment = Collections.emptySet();
         this.assignment.clear();
-        this.needsPartitionAssignment = true;
         this.subscribedPattern = null;
         this.subscriptionType = SubscriptionType.NONE;
     }
@@ -346,10 +352,6 @@ public class SubscriptionState {
         return missing;
     }
 
-    public boolean partitionAssignmentNeeded() {
-        return this.needsPartitionAssignment;
-    }
-
     public boolean isAssigned(TopicPartition tp) {
         return assignment.containsKey(tp);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 9fbbb88..8881f82 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -147,7 +147,7 @@ public class MockClient implements KafkaClient {
 
     @Override
     public List<ClientResponse> poll(long timeoutMs, long now) {
-        List<ClientResponse> copy = new ArrayList<ClientResponse>(this.responses);
+        List<ClientResponse> copy = new ArrayList<>(this.responses);
 
         while (!this.responses.isEmpty()) {
             ClientResponse response = this.responses.poll();

http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8ec8b75..0486e6c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -66,9 +66,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -148,7 +152,7 @@ public class ConsumerCoordinatorTest {
 
     @Test(expected = GroupAuthorizationException.class)
     public void testGroupReadUnauthorized() {
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -206,7 +210,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // illegal_generation will cause re-partition
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
         subscriptions.assignFromSubscribed(Collections.singletonList(tp));
 
         time.sleep(sessionTimeoutMs);
@@ -230,7 +234,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // illegal_generation will cause re-partition
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
         subscriptions.assignFromSubscribed(Collections.singletonList(tp));
 
         time.sleep(sessionTimeoutMs);
@@ -273,8 +277,7 @@ public class ConsumerCoordinatorTest {
     public void testJoinGroupInvalidGroupId() { 
         final String consumerId = "leader";
 
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(Arrays.asList(topicName));
@@ -292,8 +295,7 @@ public class ConsumerCoordinatorTest {
     public void testNormalJoinGroupLeader() {
         final String consumerId = "leader";
 
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(Arrays.asList(topicName));
@@ -304,7 +306,7 @@ public class ConsumerCoordinatorTest {
 
         // normal join group
         Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
-        partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
+        partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(tp)));
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -315,23 +317,86 @@ public class ConsumerCoordinatorTest {
                         sync.generationId() == 1 &&
                         sync.groupAssignment().containsKey(consumerId);
             }
-        }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        }, syncGroupResponse(singletonList(tp), Errors.NONE.code()));
         coordinator.poll(time.milliseconds());
 
-        assertFalse(subscriptions.partitionAssignmentNeeded());
-        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertFalse(coordinator.needRejoin());
+        assertEquals(singleton(tp), subscriptions.assignedPartitions());
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(Collections.emptySet(), rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
-        assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+        assertEquals(singleton(tp), rebalanceListener.assigned);
+    }
+
+    @Test
+    public void testMetadataRefreshDuringRebalance() {
+        final String consumerId = "leader";
+        final String otherTopicName = "otherTopic";
+        TopicPartition otherPartition = new TopicPartition(otherTopicName, 0);
+
+        subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
+        metadata.needMetadataForAllTopics(true);
+        metadata.update(cluster, time.milliseconds());
+
+        assertEquals(singleton(topicName), subscriptions.subscription());
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorReady();
+
+        Map<String, List<String>> initialSubscription = singletonMap(consumerId, Arrays.asList(topicName));
+        partitionAssignor.prepare(singletonMap(consumerId, singletonList(tp)));
+
+        // the metadata will be updated in flight with a new topic added
+        final List<String> updatedSubscription = Arrays.asList(topicName, otherTopicName);
+        final Set<String> updatedSubscriptionSet = new HashSet<>(updatedSubscription);
+
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE.code()));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                final Map<String, Integer> updatedPartitions = new HashMap<>();
+                for (String topic : updatedSubscription)
+                    updatedPartitions.put(topic, 1);
+                metadata.update(TestUtils.clusterWith(1, updatedPartitions), time.milliseconds());
+                return true;
+            }
+        }, syncGroupResponse(singletonList(tp), Errors.NONE.code()));
+
+        List<TopicPartition> newAssignment = Arrays.asList(tp, otherPartition);
+        Set<TopicPartition> newAssignmentSet = new HashSet<>(newAssignment);
+
+        Map<String, List<String>> updatedSubscriptions = singletonMap(consumerId, Arrays.asList(topicName, otherTopicName));
+        partitionAssignor.prepare(singletonMap(consumerId, newAssignment));
+
+        // we expect to see a second rebalance with the new-found topics
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                JoinGroupRequest join = new JoinGroupRequest(request.request().body());
+                ProtocolMetadata protocolMetadata = join.groupProtocols().iterator().next();
+                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata.metadata());
+                protocolMetadata.metadata().rewind();
+                return subscription.topics().containsAll(updatedSubscriptionSet);
+            }
+        }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE.code()));
+
+        coordinator.poll(time.milliseconds());
+
+        assertFalse(coordinator.needRejoin());
+        assertEquals(updatedSubscriptionSet, subscriptions.subscription());
+        assertEquals(newAssignmentSet, subscriptions.assignedPartitions());
+        assertEquals(2, rebalanceListener.revokedCount);
+        assertEquals(singleton(tp), rebalanceListener.revoked);
+        assertEquals(2, rebalanceListener.assignedCount);
+        assertEquals(newAssignmentSet, rebalanceListener.assigned);
     }
 
     @Test
     public void testWakeupDuringJoin() {
         final String consumerId = "leader";
 
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(Arrays.asList(topicName));
@@ -341,7 +406,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
-        partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
+        partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(tp)));
 
         // prepare only the first half of the join and then trigger the wakeup
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
@@ -354,23 +419,22 @@ public class ConsumerCoordinatorTest {
         }
 
         // now complete the second half
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
         coordinator.poll(time.milliseconds());
 
-        assertFalse(subscriptions.partitionAssignmentNeeded());
-        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertFalse(coordinator.needRejoin());
+        assertEquals(singleton(tp), subscriptions.assignedPartitions());
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(Collections.emptySet(), rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
-        assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+        assertEquals(singleton(tp), rebalanceListener.assigned);
     }
 
     @Test
     public void testNormalJoinGroupFollower() {
         final String consumerId = "consumer";
 
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -385,29 +449,28 @@ public class ConsumerCoordinatorTest {
                         sync.generationId() == 1 &&
                         sync.groupAssignment().isEmpty();
             }
-        }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        }, syncGroupResponse(singletonList(tp), Errors.NONE.code()));
 
         coordinator.poll(time.milliseconds());
 
-        assertFalse(subscriptions.partitionAssignmentNeeded());
-        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertFalse(coordinator.needRejoin());
+        assertEquals(singleton(tp), subscriptions.assignedPartitions());
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
-        assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+        assertEquals(singleton(tp), rebalanceListener.assigned);
     }
 
     @Test
     public void testLeaveGroupOnClose() {
         final String consumerId = "consumer";
 
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
         coordinator.poll(time.milliseconds());
 
         final AtomicBoolean received = new AtomicBoolean(false);
@@ -428,14 +491,13 @@ public class ConsumerCoordinatorTest {
     public void testMaybeLeaveGroup() {
         final String consumerId = "consumer";
 
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
         coordinator.poll(time.milliseconds());
 
         final AtomicBoolean received = new AtomicBoolean(false);
@@ -459,8 +521,7 @@ public class ConsumerCoordinatorTest {
     public void testUnexpectedErrorOnSyncGroup() {
         final String consumerId = "consumer";
 
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -475,8 +536,7 @@ public class ConsumerCoordinatorTest {
     public void testUnknownMemberIdOnSyncGroup() {
         final String consumerId = "consumer";
 
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -493,20 +553,19 @@ public class ConsumerCoordinatorTest {
                 return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
             }
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
 
         coordinator.poll(time.milliseconds());
 
-        assertFalse(subscriptions.partitionAssignmentNeeded());
-        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertFalse(coordinator.needRejoin());
+        assertEquals(singleton(tp), subscriptions.assignedPartitions());
     }
 
     @Test
     public void testRebalanceInProgressOnSyncGroup() {
         final String consumerId = "consumer";
 
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -517,20 +576,19 @@ public class ConsumerCoordinatorTest {
 
         // then let the full join/sync finish successfully
         client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
 
         coordinator.poll(time.milliseconds());
 
-        assertFalse(subscriptions.partitionAssignmentNeeded());
-        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertFalse(coordinator.needRejoin());
+        assertEquals(singleton(tp), subscriptions.assignedPartitions());
     }
 
     @Test
     public void testIllegalGenerationOnSyncGroup() {
         final String consumerId = "consumer";
 
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -547,39 +605,45 @@ public class ConsumerCoordinatorTest {
                 return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
             }
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
 
         coordinator.poll(time.milliseconds());
 
-        assertFalse(subscriptions.partitionAssignmentNeeded());
-        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertFalse(coordinator.needRejoin());
+        assertEquals(singleton(tp), subscriptions.assignedPartitions());
     }
 
     @Test
     public void testMetadataChangeTriggersRebalance() { 
         final String consumerId = "consumer";
 
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        // ensure metadata is up-to-date for leader
+        metadata.setTopics(Arrays.asList(topicName));
+        metadata.update(cluster, time.milliseconds());
+
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
+        partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(tp)));
+
+        // the leader is responsible for picking up metadata changes and forcing a group rebalance
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
 
         coordinator.poll(time.milliseconds());
 
-        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertFalse(coordinator.needRejoin());
 
         // a new partition is added to the topic
         metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
 
         // we should detect the change and ask for reassignment
-        assertTrue(subscriptions.partitionAssignmentNeeded());
+        assertTrue(coordinator.needRejoin());
     }
 
-
     @Test
     public void testUpdateMetadataDuringRebalance() {
         final String topic1 = "topic1";
@@ -590,9 +654,8 @@ public class ConsumerCoordinatorTest {
 
         List<String> topics = Arrays.asList(topic1, topic2);
 
-        subscriptions.subscribe(topics, rebalanceListener);
+        subscriptions.subscribe(new HashSet<>(topics), rebalanceListener);
         metadata.setTopics(topics);
-        subscriptions.needReassignment();
 
         // we only have metadata for one topic initially
         metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
@@ -629,7 +692,7 @@ public class ConsumerCoordinatorTest {
 
         coordinator.poll(time.milliseconds());
 
-        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertFalse(coordinator.needRejoin());
         assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
     }
 
@@ -640,7 +703,7 @@ public class ConsumerCoordinatorTest {
 
         metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
 
-        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertFalse(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
     }
 
     @Test
@@ -650,41 +713,43 @@ public class ConsumerCoordinatorTest {
 
         metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
 
-        assertTrue(subscriptions.partitionAssignmentNeeded());
+        assertTrue(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
     }
     
     @Test
     public void testRejoinGroup() {
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        String otherTopic = "otherTopic";
+
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // join the group once
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
         coordinator.poll(time.milliseconds());
 
         assertEquals(1, rebalanceListener.revokedCount);
+        assertTrue(rebalanceListener.revoked.isEmpty());
         assertEquals(1, rebalanceListener.assignedCount);
+        assertEquals(singleton(tp), rebalanceListener.assigned);
 
         // and join the group again
-        subscriptions.needReassignment();
+        subscriptions.subscribe(new HashSet<>(Arrays.asList(topicName, otherTopic)), rebalanceListener);
         client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
         coordinator.poll(time.milliseconds());
 
         assertEquals(2, rebalanceListener.revokedCount);
-        assertEquals(Collections.singleton(tp), rebalanceListener.revoked);
+        assertEquals(singleton(tp), rebalanceListener.revoked);
         assertEquals(2, rebalanceListener.assignedCount);
-        assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+        assertEquals(singleton(tp), rebalanceListener.assigned);
     }
 
     @Test
     public void testDisconnectInJoin() {
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -693,19 +758,18 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()), true);
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
         coordinator.poll(time.milliseconds());
-        assertFalse(subscriptions.partitionAssignmentNeeded());
-        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertFalse(coordinator.needRejoin());
+        assertEquals(singleton(tp), subscriptions.assignedPartitions());
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
-        assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+        assertEquals(singleton(tp), rebalanceListener.assigned);
     }
 
     @Test(expected = ApiException.class)
     public void testInvalidSessionTimeout() {
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -717,7 +781,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetOnly() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -739,14 +803,13 @@ public class ConsumerCoordinatorTest {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
                 ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
 
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
         coordinator.poll(time.milliseconds());
 
         subscriptions.seek(tp, 100);
@@ -765,8 +828,7 @@ public class ConsumerCoordinatorTest {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
                 ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
 
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.needReassignment();
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -776,7 +838,7 @@ public class ConsumerCoordinatorTest {
         consumerClient.poll(0);
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
         coordinator.poll(time.milliseconds());
 
         subscriptions.seek(tp, 100);
@@ -793,7 +855,7 @@ public class ConsumerCoordinatorTest {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
                 ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
 
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 100);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
@@ -811,7 +873,7 @@ public class ConsumerCoordinatorTest {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
                 ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
 
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 100);
 
         // no commit initially since coordinator is unknown
@@ -835,7 +897,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetMetadata() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -866,20 +928,20 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitAfterLeaveGroup() {
         // enable auto-assignment
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.subscribe(singleton(topicName), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
         coordinator.poll(time.milliseconds());
 
         // now switch to manual assignment
         client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct());
         subscriptions.unsubscribe();
         coordinator.maybeLeaveGroup();
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
 
         // the client should not reuse generation/memberId from auto-subscribed generation
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -1048,7 +1110,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
@@ -1061,7 +1123,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.GROUP_LOAD_IN_PROGRESS.code(), "", 100L));
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
@@ -1075,7 +1137,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
@@ -1090,7 +1152,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
         coordinator.refreshCommittedOffsetsIfNeeded();
@@ -1122,37 +1184,6 @@ public class ConsumerCoordinatorTest {
         }
     }
 
-    @Test
-    public void testMetadataTopicsExpiryDisabled() {
-        final String consumerId = "consumer";
-
-        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        HashSet<String> topics = new HashSet<>();
-        topics.add(topicName);
-        metadata.setTopics(topics);
-        subscriptions.needReassignment();
-
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorReady();
-
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
-        coordinator.poll(time.milliseconds());
-
-        metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
-        assertTrue("Topic not found in metadata", metadata.containsTopic(topicName));
-        time.sleep(Metadata.TOPIC_EXPIRY_MS * 2);
-        metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
-        assertTrue("Topic expired", metadata.containsTopic(topicName));
-        metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
-        metadata.update(Cluster.empty(), time.milliseconds());
-        assertTrue("Topic expired", metadata.containsTopic(topicName));
-
-        assertTrue(subscriptions.partitionAssignmentNeeded());
-        metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
-        assertTrue(subscriptions.partitionAssignmentNeeded());
-    }
-
     private ConsumerCoordinator buildCoordinator(Metrics metrics,
                                                  List<PartitionAssignor> assignors,
                                                  boolean excludeInternalTopics,
@@ -1187,7 +1218,8 @@ public class ConsumerCoordinatorTest {
         return response.toStruct();
     }
 
-    private Struct joinGroupLeaderResponse(int generationId, String memberId,
+    private Struct joinGroupLeaderResponse(int generationId,
+                                           String memberId,
                                            Map<String, List<String>> subscriptions,
                                            short error) {
         Map<String, ByteBuffer> metadata = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5186618..5c0b49c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -66,6 +66,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -123,7 +124,7 @@ public class FetcherTest {
     @Test
     public void testFetchNormal() {
         List<ConsumerRecord<byte[], byte[]>> records;
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         // normal fetch
@@ -167,7 +168,7 @@ public class FetcherTest {
 
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer);
 
-        subscriptions.assignFromUser(Collections.singleton(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 1);
 
         client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
@@ -209,7 +210,7 @@ public class FetcherTest {
         compressor.close();
         buffer.flip();
 
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         // normal fetch
@@ -230,7 +231,7 @@ public class FetcherTest {
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2);
 
         List<ConsumerRecord<byte[], byte[]>> records;
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 1);
 
         client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
@@ -272,7 +273,7 @@ public class FetcherTest {
         records.close();
 
         List<ConsumerRecord<byte[], byte[]>> consumerRecords;
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         // normal fetch
@@ -290,7 +291,7 @@ public class FetcherTest {
 
     @Test(expected = RecordTooLargeException.class)
     public void testFetchRecordTooLarge() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         // prepare large record
@@ -309,7 +310,7 @@ public class FetcherTest {
 
     @Test
     public void testUnauthorizedTopic() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         // resize the limit of the buffer to pretend it is only fetch-size large
@@ -320,20 +321,20 @@ public class FetcherTest {
             fetcher.fetchedRecords();
             fail("fetchedRecords should have thrown");
         } catch (TopicAuthorizationException e) {
-            assertEquals(Collections.singleton(topicName), e.unauthorizedTopics());
+            assertEquals(singleton(topicName), e.unauthorizedTopics());
         }
     }
 
     @Test
     public void testFetchDuringRebalance() {
-        subscriptions.subscribe(Arrays.asList(topicName), listener);
-        subscriptions.assignFromSubscribed(Arrays.asList(tp));
+        subscriptions.subscribe(singleton(topicName), listener);
+        subscriptions.assignFromSubscribed(singleton(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.sendFetches();
 
         // Now the rebalance happens and fetch positions are cleared
-        subscriptions.assignFromSubscribed(Arrays.asList(tp));
+        subscriptions.assignFromSubscribed(singleton(tp));
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
 
@@ -343,7 +344,7 @@ public class FetcherTest {
 
     @Test
     public void testInFlightFetchOnPausedPartition() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.sendFetches();
@@ -356,7 +357,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchOnPausedPartition() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         subscriptions.pause(tp);
@@ -366,7 +367,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchNotLeaderForPartition() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.sendFetches();
@@ -378,7 +379,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchUnknownTopicOrPartition() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.sendFetches();
@@ -390,7 +391,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchOffsetOutOfRange() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.sendFetches();
@@ -405,7 +406,7 @@ public class FetcherTest {
     public void testStaleOutOfRangeError() {
         // verify that an out of range error which arrives after a seek
         // does not cause us to reset our position or throw an exception
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.sendFetches();
@@ -419,7 +420,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchedRecordsAfterSeek() {
-        subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
+        subscriptionsNoAutoReset.assignFromUser(singleton(tp));
         subscriptionsNoAutoReset.seek(tp, 0);
 
         fetcherNoAutoReset.sendFetches();
@@ -432,7 +433,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchOffsetOutOfRangeException() {
-        subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
+        subscriptionsNoAutoReset.assignFromUser(singleton(tp));
         subscriptionsNoAutoReset.seek(tp, 0);
 
         fetcherNoAutoReset.sendFetches();
@@ -452,7 +453,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchDisconnected() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.sendFetches();
@@ -470,22 +471,22 @@ public class FetcherTest {
     public void testUpdateFetchPositionToCommitted() {
         // unless a specific reset is expected, the default behavior is to reset to the committed
         // position if one is present
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.committed(tp, new OffsetAndMetadata(5));
 
-        fetcher.updateFetchPositions(Collections.singleton(tp));
+        fetcher.updateFetchPositions(singleton(tp));
         assertTrue(subscriptions.isFetchable(tp));
         assertEquals(5, subscriptions.position(tp).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionResetToDefaultOffset() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         // with no commit position, we should reset using the default strategy defined above (EARLIEST)
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
-        fetcher.updateFetchPositions(Collections.singleton(tp));
+        fetcher.updateFetchPositions(singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
         assertEquals(5, subscriptions.position(tp).longValue());
@@ -493,12 +494,12 @@ public class FetcherTest {
 
     @Test
     public void testUpdateFetchPositionResetToLatestOffset() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
-        fetcher.updateFetchPositions(Collections.singleton(tp));
+        fetcher.updateFetchPositions(singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
         assertEquals(5, subscriptions.position(tp).longValue());
@@ -506,12 +507,12 @@ public class FetcherTest {
 
     @Test
     public void testUpdateFetchPositionResetToEarliestOffset() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
-        fetcher.updateFetchPositions(Collections.singleton(tp));
+        fetcher.updateFetchPositions(singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
         assertEquals(5, subscriptions.position(tp).longValue());
@@ -519,7 +520,7 @@ public class FetcherTest {
 
     @Test
     public void testUpdateFetchPositionDisconnect() {
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
 
         // First request gets a disconnect
@@ -529,7 +530,7 @@ public class FetcherTest {
         // Next one succeeds
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
-        fetcher.updateFetchPositions(Collections.singleton(tp));
+        fetcher.updateFetchPositions(singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
         assertEquals(5, subscriptions.position(tp).longValue());
@@ -567,7 +568,7 @@ public class FetcherTest {
             fetcher.getAllTopicMetadata(10L);
             fail();
         } catch (TopicAuthorizationException e) {
-            assertEquals(Collections.singleton(topicName), e.unauthorizedTopics());
+            assertEquals(singleton(topicName), e.unauthorizedTopics());
         }
     }
 
@@ -600,7 +601,7 @@ public class FetcherTest {
     @Test
     public void testQuotaMetrics() throws Exception {
         List<ConsumerRecord<byte[], byte[]>> records;
-        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
         // normal fetch

http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 3b4b10e..783f0e6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -16,21 +16,23 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static java.util.Arrays.asList;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.regex.Pattern;
 
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Test;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singleton;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class SubscriptionStateTest {
 
@@ -43,16 +45,15 @@ public class SubscriptionStateTest {
 
     @Test
     public void partitionAssignment() {
-        state.assignFromUser(Arrays.asList(tp0));
-        assertEquals(Collections.singleton(tp0), state.assignedPartitions());
-        assertFalse(state.partitionAssignmentNeeded());
+        state.assignFromUser(singleton(tp0));
+        assertEquals(singleton(tp0), state.assignedPartitions());
         assertFalse(state.hasAllFetchPositions());
         assertTrue(state.refreshCommitsNeeded());
         state.committed(tp0, new OffsetAndMetadata(1));
         state.seek(tp0, 1);
         assertTrue(state.isFetchable(tp0));
         assertAllPositions(tp0, 1L);
-        state.assignFromUser(Arrays.<TopicPartition>asList());
+        state.assignFromUser(Collections.<TopicPartition>emptySet());
         assertTrue(state.assignedPartitions().isEmpty());
         assertFalse(state.isAssigned(tp0));
         assertFalse(state.isFetchable(tp0));
@@ -60,7 +61,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void partitionReset() {
-        state.assignFromUser(Arrays.asList(tp0));
+        state.assignFromUser(singleton(tp0));
         state.seek(tp0, 5);
         assertEquals(5L, (long) state.position(tp0));
         state.needOffsetReset(tp0);
@@ -76,9 +77,8 @@ public class SubscriptionStateTest {
 
     @Test
     public void topicSubscription() {
-        state.subscribe(Arrays.asList(topic), rebalanceListener);
+        state.subscribe(singleton(topic), rebalanceListener);
         assertEquals(1, state.subscription().size());
-        assertTrue(state.partitionAssignmentNeeded());
         assertTrue(state.assignedPartitions().isEmpty());
         assertTrue(state.partitionsAutoAssigned());
         state.assignFromSubscribed(asList(tp0));
@@ -87,15 +87,14 @@ public class SubscriptionStateTest {
         assertAllPositions(tp0, 1L);
         state.assignFromSubscribed(asList(tp1));
         assertTrue(state.isAssigned(tp1));
-        assertFalse(state.partitionAssignmentNeeded());
         assertFalse(state.isAssigned(tp0));
         assertFalse(state.isFetchable(tp1));
-        assertEquals(Collections.singleton(tp1), state.assignedPartitions());
+        assertEquals(singleton(tp1), state.assignedPartitions());
     }
 
     @Test
     public void partitionPause() {
-        state.assignFromUser(Arrays.asList(tp0));
+        state.assignFromUser(singleton(tp0));
         state.seek(tp0, 100);
         assertTrue(state.isFetchable(tp0));
         state.pause(tp0);
@@ -106,7 +105,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void commitOffsetMetadata() {
-        state.assignFromUser(Arrays.asList(tp0));
+        state.assignFromUser(singleton(tp0));
         state.committed(tp0, new OffsetAndMetadata(5, "hi"));
 
         assertEquals(5, state.committed(tp0).offset());
@@ -115,7 +114,7 @@ public class SubscriptionStateTest {
 
     @Test(expected = IllegalStateException.class)
     public void invalidPositionUpdate() {
-        state.subscribe(Arrays.asList(topic), rebalanceListener);
+        state.subscribe(singleton(topic), rebalanceListener);
         state.assignFromSubscribed(asList(tp0));
         state.position(tp0, 0);
     }
@@ -132,32 +131,32 @@ public class SubscriptionStateTest {
 
     @Test(expected = IllegalStateException.class)
     public void cantSubscribeTopicAndPattern() {
-        state.subscribe(Arrays.asList(topic), rebalanceListener);
+        state.subscribe(singleton(topic), rebalanceListener);
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
     }
 
     @Test(expected = IllegalStateException.class)
     public void cantSubscribePartitionAndPattern() {
-        state.assignFromUser(Arrays.asList(tp0));
+        state.assignFromUser(singleton(tp0));
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
     }
 
     @Test(expected = IllegalStateException.class)
     public void cantSubscribePatternAndTopic() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
-        state.subscribe(Arrays.asList(topic), rebalanceListener);
+        state.subscribe(singleton(topic), rebalanceListener);
     }
 
     @Test(expected = IllegalStateException.class)
     public void cantSubscribePatternAndPartition() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
-        state.assignFromUser(Arrays.asList(tp0));
+        state.assignFromUser(singleton(tp0));
     }
 
     @Test
     public void patternSubscription() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
-        state.changeSubscription(Arrays.asList(topic, topic1));
+        state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, topic1)));
 
         assertEquals(
                 "Expected subscribed topics count is incorrect", 2, state.subscription().size());
@@ -165,43 +164,37 @@ public class SubscriptionStateTest {
 
     @Test
     public void unsubscribeUserAssignment() {
-        state.assignFromUser(Arrays.asList(tp0, tp1));
+        state.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
         state.unsubscribe();
-        state.subscribe(Arrays.asList(topic), rebalanceListener);
-        assertEquals(Collections.singleton(topic), state.subscription());
+        state.subscribe(singleton(topic), rebalanceListener);
+        assertEquals(singleton(topic), state.subscription());
     }
 
     @Test
     public void unsubscribeUserSubscribe() {
-        state.subscribe(Arrays.asList(topic), rebalanceListener);
+        state.subscribe(singleton(topic), rebalanceListener);
         state.unsubscribe();
-        state.assignFromUser(Arrays.asList(tp0));
-        assertEquals(Collections.singleton(tp0), state.assignedPartitions());
+        state.assignFromUser(singleton(tp0));
+        assertEquals(singleton(tp0), state.assignedPartitions());
     }
 
     @Test
     public void unsubscription() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
-        state.changeSubscription(Arrays.asList(topic, topic1));
-        assertTrue(state.partitionAssignmentNeeded());
-
+        state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, topic1)));
         state.assignFromSubscribed(asList(tp1));
-        assertEquals(Collections.singleton(tp1), state.assignedPartitions());
-        assertFalse(state.partitionAssignmentNeeded());
+        assertEquals(singleton(tp1), state.assignedPartitions());
 
         state.unsubscribe();
         assertEquals(0, state.subscription().size());
         assertTrue(state.assignedPartitions().isEmpty());
-        assertTrue(state.partitionAssignmentNeeded());
 
-        state.assignFromUser(Arrays.asList(tp0));
-        assertEquals(Collections.singleton(tp0), state.assignedPartitions());
-        assertFalse(state.partitionAssignmentNeeded());
+        state.assignFromUser(singleton(tp0));
+        assertEquals(singleton(tp0), state.assignedPartitions());
 
         state.unsubscribe();
         assertEquals(0, state.subscription().size());
         assertTrue(state.assignedPartitions().isEmpty());
-        assertTrue(state.partitionAssignmentNeeded());
     }
 
     private static class MockRebalanceListener implements ConsumerRebalanceListener {