You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/23 06:01:07 UTC

kafka git commit: KAFKA-2686: Reset needsPartitionAssignment in SubscriptionState.assign()

Repository: kafka
Updated Branches:
  refs/heads/trunk bf292a6fa -> aa56dfb9e


KAFKA-2686: Reset needsPartitionAssignment in SubscriptionState.assign()

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Jason Gustafson, Jun Rao

Closes #352 from guozhangwang/K2686


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa56dfb9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa56dfb9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa56dfb9

Branch: refs/heads/trunk
Commit: aa56dfb9e7cea19faa545a13d42d499a6958cbef
Parents: bf292a6
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Oct 22 21:06:10 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Oct 22 21:06:10 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 20 ++++-
 .../kafka/clients/consumer/MockConsumer.java    |  4 +-
 .../consumer/internals/ConsumerCoordinator.java |  2 +-
 .../consumer/internals/SubscriptionState.java   | 38 +++++----
 .../clients/consumer/KafkaConsumerTest.java     | 32 ++++++++
 .../internals/ConsumerCoordinatorTest.java      | 18 ++---
 .../clients/consumer/internals/FetcherTest.java | 36 ++++-----
 .../internals/SubscriptionStateTest.java        | 81 ++++++++++----------
 .../clients/producer/KafkaProducerTest.java     |  1 -
 9 files changed, 142 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index cd166f0..06a9239 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -629,6 +629,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * assign partitions. Topic subscriptions are not incremental. This list will replace the current
      * assignment (if there is one). Note that it is not possible to combine topic subscription with group management
      * with manual partition assignment through {@link #assign(List)}.
+     *
+     * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
+     *
      * <p>
      * As part of group management, the consumer will keep track of the list of consumers that belong to a particular
      * group and will trigger a rebalance operation if one of the following events trigger -
@@ -653,9 +656,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     public void subscribe(List<String> topics, ConsumerRebalanceListener listener) {
         acquire();
         try {
-            log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
-            this.subscriptions.subscribe(topics, listener);
-            metadata.setTopics(subscriptions.groupSubscription());
+            if (topics.isEmpty()) {
+                // treat subscribing to empty topic list as the same as unsubscribing
+                this.unsubscribe();
+            } else {
+                log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
+                this.subscriptions.subscribe(topics, listener);
+                metadata.setTopics(subscriptions.groupSubscription());
+            }
         } finally {
             release();
         }
@@ -666,6 +674,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * assign partitions. Topic subscriptions are not incremental. This list will replace the current
      * assignment (if there is one). It is not possible to combine topic subscription with group management
      * with manual partition assignment through {@link #assign(List)}.
+     *
+     * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
+     *
      * <p>
      * This is a short-hand for {@link #subscribe(List, ConsumerRebalanceListener)}, which
      * uses a noop listener. If you need the ability to either seek to particular offsets, you should prefer
@@ -715,6 +726,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     public void unsubscribe() {
         acquire();
         try {
+            log.debug("Unsubscribed all topics or patterns and assigned partitions");
             this.subscriptions.unsubscribe();
             this.coordinator.resetGeneration();
             this.metadata.needMetadataForAllTopics(false);
@@ -739,7 +751,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         acquire();
         try {
             log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
-            this.subscriptions.assign(partitions);
+            this.subscriptions.assignFromUser(partitions);
             Set<String> topics = new HashSet<>();
             for (TopicPartition tp : partitions)
                 topics.add(tp.topic());

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 0242d7b..ed1c1e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -76,7 +76,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     public void rebalance(Collection<TopicPartition> newAssignment) {
         // TODO: Rebalance callbacks
         this.records.clear();
-        this.subscriptions.changePartitionAssignment(newAssignment);
+        this.subscriptions.assignFromSubscribed(newAssignment);
     }
 
     @Override
@@ -112,7 +112,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     @Override
     public void assign(List<TopicPartition> partitions) {
         ensureNotClosed();
-        this.subscriptions.assign(partitions);
+        this.subscriptions.assignFromUser(partitions);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index fc7e819..d6291bf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -169,7 +169,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
         subscriptions.needRefreshCommits();
 
         // update partition assignment
-        subscriptions.changePartitionAssignment(assignment.partitions());
+        subscriptions.assignFromSubscribed(assignment.partitions());
 
         // give the assignor a chance to update internal state based on the received assignment
         assignor.onAssignment(assignment);

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6e79a7f..a9ff35f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -28,8 +28,8 @@ import java.util.regex.Pattern;
 
 /**
  * A class for tracking the topics, partitions, and offsets for the consumer. A partition
- * is "assigned" either directly with {@link #assign(List)} (manual assignment)
- * or with {@link #changePartitionAssignment(Collection)} (automatic assignment).
+ * is "assigned" either directly with {@link #assignFromUser(Collection)} (manual assignment)
+ * or with {@link #assignFromSubscribed(Collection)} (automatic assignment from subscription).
  *
  * Once assigned, the partition is not considered "fetchable" until its initial position has
  * been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track a fetch
@@ -129,12 +129,16 @@ public class SubscriptionState {
     }
 
     public void needReassignment() {
-        //
         this.groupSubscription.retainAll(subscription);
         this.needsPartitionAssignment = true;
     }
 
-    public void assign(List<TopicPartition> partitions) {
+    /**
+     * Change the assignment to the specified partitions provided by the user,
+     * note this is different from {@link #assignFromSubscribed(Collection)}
+     * whose input partitions are provided from the subscribed topics.
+     */
+    public void assignFromUser(Collection<TopicPartition> partitions) {
         if (!this.subscription.isEmpty() || this.subscribedPattern != null)
             throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
 
@@ -146,6 +150,22 @@ public class SubscriptionState {
                 addAssignedPartition(partition);
 
         this.assignment.keySet().retainAll(this.userAssignment);
+
+        this.needsPartitionAssignment = false;
+    }
+
+    /**
+     * Change the assignment to the specified partitions returned from the coordinator,
+     * note this is different from {@link #assignFromUser(Collection)} which directly set the assignment from user inputs
+     */
+    public void assignFromSubscribed(Collection<TopicPartition> assignments) {
+        for (TopicPartition tp : assignments)
+            if (!this.subscription.contains(tp.topic()))
+                throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
+        this.assignment.clear();
+        for (TopicPartition tp: assignments)
+            addAssignedPartition(tp);
+        this.needsPartitionAssignment = false;
     }
 
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
@@ -306,16 +326,6 @@ public class SubscriptionState {
         return this.needsPartitionAssignment;
     }
 
-    public void changePartitionAssignment(Collection<TopicPartition> assignments) {
-        for (TopicPartition tp : assignments)
-            if (!this.subscription.contains(tp.topic()))
-                throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
-        this.assignment.clear();
-        for (TopicPartition tp: assignments)
-            addAssignedPartition(tp);
-        this.needsPartitionAssignment = false;
-    }
-
     public boolean isAssigned(TopicPartition tp) {
         return assignment.containsKey(tp);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 7625218..983c45d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -17,15 +17,20 @@
 package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.Properties;
 
 public class KafkaConsumerTest {
 
+    private final String topic = "test";
+    private final TopicPartition tp0 = new TopicPartition("test", 0);
+
     @Test
     public void testConstructorClose() throws Exception {
         Properties props = new Properties();
@@ -46,4 +51,31 @@ public class KafkaConsumerTest {
         }
         Assert.fail("should have caught an exception and returned");
     }
+
+    @Test
+    public void testSubscription() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSubscription");
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
+            props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+
+        consumer.subscribe(Collections.singletonList(topic));
+        Assert.assertEquals(Collections.singleton(topic), consumer.subscription());
+        Assert.assertTrue(consumer.assignment().isEmpty());
+
+        consumer.subscribe(Collections.<String>emptyList());
+        Assert.assertTrue(consumer.subscription().isEmpty());
+        Assert.assertTrue(consumer.assignment().isEmpty());
+
+        consumer.assign(Collections.singletonList(tp0));
+        Assert.assertTrue(consumer.subscription().isEmpty());
+        Assert.assertEquals(Collections.singleton(tp0), consumer.assignment());
+
+        consumer.unsubscribe();
+        Assert.assertTrue(consumer.subscription().isEmpty());
+        Assert.assertTrue(consumer.assignment().isEmpty());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 93994d7..b20277f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -193,7 +193,7 @@ public class ConsumerCoordinatorTest {
 
         // illegal_generation will cause re-partition
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.changePartitionAssignment(Collections.singletonList(tp));
+        subscriptions.assignFromSubscribed(Collections.singletonList(tp));
 
         time.sleep(sessionTimeoutMs);
         RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
@@ -217,7 +217,7 @@ public class ConsumerCoordinatorTest {
 
         // illegal_generation will cause re-partition
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
-        subscriptions.changePartitionAssignment(Collections.singletonList(tp));
+        subscriptions.assignFromSubscribed(Collections.singletonList(tp));
 
         time.sleep(sessionTimeoutMs);
         RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
@@ -413,7 +413,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetOnly() {
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
@@ -430,7 +430,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetMetadata() {
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
@@ -473,7 +473,7 @@ public class ConsumerCoordinatorTest {
         // now switch to manual assignment
         subscriptions.unsubscribe();
         coordinator.resetGeneration();
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
 
         // the client should not reuse generation/memberId from auto-subscribed generation
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -612,7 +612,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
 
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
@@ -625,7 +625,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
 
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
@@ -639,7 +639,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
 
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L));
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
@@ -654,7 +654,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
 
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
         coordinator.refreshCommittedOffsetsIfNeeded();

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 8773f8c..957d8f9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -110,7 +110,7 @@ public class FetcherTest {
     @Test
     public void testFetchNormal() {
         List<ConsumerRecord<byte[], byte[]>> records;
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         // normal fetch
@@ -130,7 +130,7 @@ public class FetcherTest {
 
     @Test(expected = RecordTooLargeException.class)
     public void testFetchRecordTooLarge() {
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         // prepare large record
@@ -150,13 +150,13 @@ public class FetcherTest {
     @Test
     public void testFetchDuringRebalance() {
         subscriptions.subscribe(Arrays.asList(topicName), listener);
-        subscriptions.changePartitionAssignment(Arrays.asList(tp));
+        subscriptions.assignFromSubscribed(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
 
         // Now the rebalance happens and fetch positions are cleared
-        subscriptions.changePartitionAssignment(Arrays.asList(tp));
+        subscriptions.assignFromSubscribed(Arrays.asList(tp));
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
 
@@ -166,7 +166,7 @@ public class FetcherTest {
 
     @Test
     public void testInFlightFetchOnPausedPartition() {
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
@@ -179,7 +179,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchOnPausedPartition() {
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         subscriptions.pause(tp);
@@ -189,7 +189,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchNotLeaderForPartition() {
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
@@ -201,7 +201,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchUnknownTopicOrPartition() {
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
@@ -213,7 +213,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchOffsetOutOfRange() {
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
@@ -227,7 +227,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchedRecordsAfterSeek() {
-        subscriptionsNoAutoReset.assign(Arrays.asList(tp));
+        subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
         subscriptionsNoAutoReset.seek(tp, 0);
 
         fetcherNoAutoReset.initFetches(cluster);
@@ -240,7 +240,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchOffsetOutOfRangeException() {
-        subscriptionsNoAutoReset.assign(Arrays.asList(tp));
+        subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
         subscriptionsNoAutoReset.seek(tp, 0);
 
         fetcherNoAutoReset.initFetches(cluster);
@@ -259,7 +259,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchDisconnected() {
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
@@ -278,7 +278,7 @@ public class FetcherTest {
     public void testUpdateFetchPositionToCommitted() {
         // unless a specific reset is expected, the default behavior is to reset to the committed
         // position if one is present
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.committed(tp, new OffsetAndMetadata(5));
 
         fetcher.updateFetchPositions(Collections.singleton(tp));
@@ -289,7 +289,7 @@ public class FetcherTest {
 
     @Test
     public void testUpdateFetchPositionResetToDefaultOffset() {
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         // with no commit position, we should reset using the default strategy defined above (EARLIEST)
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
@@ -303,7 +303,7 @@ public class FetcherTest {
 
     @Test
     public void testUpdateFetchPositionResetToLatestOffset() {
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
@@ -317,7 +317,7 @@ public class FetcherTest {
 
     @Test
     public void testUpdateFetchPositionResetToEarliestOffset() {
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
@@ -331,7 +331,7 @@ public class FetcherTest {
 
     @Test
     public void testUpdateFetchPositionDisconnect() {
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
 
         // First request gets a disconnect
@@ -365,7 +365,7 @@ public class FetcherTest {
     @Test
     public void testQuotaMetrics() throws Exception {
         List<ConsumerRecord<byte[], byte[]>> records;
-        subscriptions.assign(Arrays.asList(tp));
+        subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         // normal fetch

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index a0568ad..c5fce61 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -35,19 +35,22 @@ import org.junit.Test;
 public class SubscriptionStateTest {
 
     private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST);
+    private final String topic = "test";
+    private final String topic1 = "test1";
     private final TopicPartition tp0 = new TopicPartition("test", 0);
     private final TopicPartition tp1 = new TopicPartition("test", 1);
     private final MockRebalanceListener rebalanceListener = new MockRebalanceListener();
 
     @Test
     public void partitionAssignment() {
-        state.assign(Arrays.asList(tp0));
+        state.assignFromUser(Arrays.asList(tp0));
         assertEquals(Collections.singleton(tp0), state.assignedPartitions());
+        assertFalse(state.partitionAssignmentNeeded());
         state.committed(tp0, new OffsetAndMetadata(1));
         state.seek(tp0, 1);
         assertTrue(state.isFetchable(tp0));
         assertAllPositions(tp0, 1L);
-        state.assign(Arrays.<TopicPartition>asList());
+        state.assignFromUser(Arrays.<TopicPartition>asList());
         assertTrue(state.assignedPartitions().isEmpty());
         assertFalse(state.isAssigned(tp0));
         assertFalse(state.isFetchable(tp0));
@@ -55,7 +58,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void partitionReset() {
-        state.assign(Arrays.asList(tp0));
+        state.assignFromUser(Arrays.asList(tp0));
         state.seek(tp0, 5);
         assertEquals(5L, (long) state.fetched(tp0));
         assertEquals(5L, (long) state.consumed(tp0));
@@ -73,16 +76,18 @@ public class SubscriptionStateTest {
 
     @Test
     public void topicSubscription() {
-        state.subscribe(Arrays.asList("test"), rebalanceListener);
+        state.subscribe(Arrays.asList(topic), rebalanceListener);
         assertEquals(1, state.subscription().size());
+        assertTrue(state.partitionAssignmentNeeded());
         assertTrue(state.assignedPartitions().isEmpty());
         assertTrue(state.partitionsAutoAssigned());
-        state.changePartitionAssignment(asList(tp0));
+        state.assignFromSubscribed(asList(tp0));
         state.seek(tp0, 1);
         state.committed(tp0, new OffsetAndMetadata(1));
         assertAllPositions(tp0, 1L);
-        state.changePartitionAssignment(asList(tp1));
+        state.assignFromSubscribed(asList(tp1));
         assertTrue(state.isAssigned(tp1));
+        assertFalse(state.partitionAssignmentNeeded());
         assertFalse(state.isAssigned(tp0));
         assertFalse(state.isFetchable(tp1));
         assertEquals(Collections.singleton(tp1), state.assignedPartitions());
@@ -90,7 +95,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void partitionPause() {
-        state.assign(Arrays.asList(tp0));
+        state.assignFromUser(Arrays.asList(tp0));
         state.seek(tp0, 100);
         assertTrue(state.isFetchable(tp0));
         state.pause(tp0);
@@ -101,44 +106,24 @@ public class SubscriptionStateTest {
 
     @Test
     public void commitOffsetMetadata() {
-        state.assign(Arrays.asList(tp0));
+        state.assignFromUser(Arrays.asList(tp0));
         state.committed(tp0, new OffsetAndMetadata(5, "hi"));
 
         assertEquals(5, state.committed(tp0).offset());
         assertEquals("hi", state.committed(tp0).metadata());
     }
 
-    @Test
-    public void topicUnsubscription() {
-        final String topic = "test";
-        state.subscribe(Arrays.asList(topic), rebalanceListener);
-        assertEquals(1, state.subscription().size());
-        assertTrue(state.assignedPartitions().isEmpty());
-        assertTrue(state.partitionsAutoAssigned());
-        state.changePartitionAssignment(asList(tp0));
-        state.committed(tp0, new OffsetAndMetadata(1));
-        state.seek(tp0, 1);
-        assertAllPositions(tp0, 1L);
-        state.changePartitionAssignment(asList(tp1));
-        assertFalse(state.isAssigned(tp0));
-        assertEquals(Collections.singleton(tp1), state.assignedPartitions());
-
-        state.subscribe(Arrays.<String>asList(), rebalanceListener);
-        assertEquals(0, state.subscription().size());
-        assertTrue(state.assignedPartitions().isEmpty());
-    }
-
     @Test(expected = IllegalStateException.class)
     public void invalidConsumedPositionUpdate() {
-        state.subscribe(Arrays.asList("test"), rebalanceListener);
-        state.changePartitionAssignment(asList(tp0));
+        state.subscribe(Arrays.asList(topic), rebalanceListener);
+        state.assignFromSubscribed(asList(tp0));
         state.consumed(tp0, 0);
     }
 
     @Test(expected = IllegalStateException.class)
     public void invalidFetchPositionUpdate() {
-        state.subscribe(Arrays.asList("test"), rebalanceListener);
-        state.changePartitionAssignment(asList(tp0));
+        state.subscribe(Arrays.asList(topic), rebalanceListener);
+        state.assignFromSubscribed(asList(tp0));
         state.fetched(tp0, 0);
     }
 
@@ -160,46 +145,60 @@ public class SubscriptionStateTest {
 
     @Test(expected = IllegalStateException.class)
     public void cantSubscribeTopicAndPattern() {
-        state.subscribe(Arrays.asList("test"), rebalanceListener);
+        state.subscribe(Arrays.asList(topic), rebalanceListener);
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
     }
 
     @Test(expected = IllegalStateException.class)
     public void cantSubscribePartitionAndPattern() {
-        state.assign(Arrays.asList(new TopicPartition("test", 0)));
+        state.assignFromUser(Arrays.asList(tp0));
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
     }
 
     @Test(expected = IllegalStateException.class)
     public void cantSubscribePatternAndTopic() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
-        state.subscribe(Arrays.asList("test"), rebalanceListener);
+        state.subscribe(Arrays.asList(topic), rebalanceListener);
     }
 
     @Test(expected = IllegalStateException.class)
     public void cantSubscribePatternAndPartition() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
-        state.assign(Arrays.asList(new TopicPartition("test", 0)));
+        state.assignFromUser(Arrays.asList(tp0));
     }
 
     @Test
     public void patternSubscription() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
-        state.changeSubscription(Arrays.asList("test", "test1"));
+        state.changeSubscription(Arrays.asList(topic, topic1));
 
         assertEquals(
                 "Expected subscribed topics count is incorrect", 2, state.subscription().size());
     }
 
     @Test
-    public void patternUnsubscription() {
+    public void unsubscription() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
-        state.changeSubscription(Arrays.asList("test", "test1"));
+        state.changeSubscription(Arrays.asList(topic, topic1));
+        assertTrue(state.partitionAssignmentNeeded());
+
+        state.assignFromSubscribed(asList(tp1));
+        assertEquals(Collections.singleton(tp1), state.assignedPartitions());
+        assertFalse(state.partitionAssignmentNeeded());
 
         state.unsubscribe();
+        assertEquals(0, state.subscription().size());
+        assertTrue(state.assignedPartitions().isEmpty());
+        assertTrue(state.partitionAssignmentNeeded());
 
-        assertEquals(
-                "Expected subscribed topics count is incorrect", 0, state.subscription().size());
+        state.assignFromUser(Arrays.asList(tp0));
+        assertEquals(Collections.singleton(tp0), state.assignedPartitions());
+        assertFalse(state.partitionAssignmentNeeded());
+
+        state.unsubscribe();
+        assertEquals(0, state.subscription().size());
+        assertTrue(state.assignedPartitions().isEmpty());
+        assertTrue(state.partitionAssignmentNeeded());
     }
 
     private static class MockRebalanceListener implements ConsumerRebalanceListener {

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index d1759ce..b044cf4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -31,7 +31,6 @@ import java.util.HashMap;
 
 public class KafkaProducerTest {
 
-
     @Test
     public void testConstructorFailureCloseResource() {
         Properties props = new Properties();