You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/08/27 02:18:48 UTC

[2/2] kafka git commit: KAFKA-2388: refactor KafkaConsumer subscribe API

KAFKA-2388: refactor KafkaConsumer subscribe API

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Edward Ribeiro, Onur Karaman, Ismael Juma, Guozhang Wang

Closes #139 from hachikuji/KAFKA-2388 and squashes the following commits:

377c67e [Jason Gustafson] KAFKA-2388; refactor KafkaConsumer subscribe API


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/35eaef7b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/35eaef7b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/35eaef7b

Branch: refs/heads/trunk
Commit: 35eaef7bb4ebcf6b209312db774564451b052ca9
Parents: 03f850f
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Aug 26 17:20:51 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Aug 26 17:20:51 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/Metadata.java |  19 +-
 .../apache/kafka/clients/consumer/Consumer.java |  20 +--
 .../kafka/clients/consumer/ConsumerConfig.java  |  12 --
 .../consumer/ConsumerRebalanceCallback.java     |  95 ----------
 .../consumer/ConsumerRebalanceListener.java     |  97 ++++++++++
 .../kafka/clients/consumer/KafkaConsumer.java   | 175 +++++++++----------
 .../kafka/clients/consumer/MockConsumer.java    |  27 ++-
 .../clients/consumer/internals/Coordinator.java |  30 ++--
 .../NoOpConsumerRebalanceCallback.java          |  30 ----
 .../NoOpConsumerRebalanceListener.java          |  30 ++++
 .../consumer/internals/SubscriptionState.java   | 158 +++++++++++------
 .../clients/consumer/KafkaConsumerTest.java     |   2 +-
 .../clients/consumer/MockConsumerTest.java      |  25 ++-
 .../consumer/internals/CoordinatorTest.java     |  47 ++---
 .../clients/consumer/internals/FetcherTest.java |  29 +--
 .../internals/SubscriptionStateTest.java        |  30 ++--
 .../kafka/copycat/runtime/WorkerSinkTask.java   |   6 +-
 .../copycat/runtime/WorkerSinkTaskTest.java     |   2 +-
 .../scala/kafka/consumer/BaseConsumer.scala     |   3 +-
 .../scala/kafka/tools/ConsumerPerformance.scala |   2 +-
 .../kafka/api/ConsumerBounceTest.scala          |   4 +-
 .../integration/kafka/api/ConsumerTest.scala    |  83 ++++-----
 .../integration/kafka/api/QuotasTest.scala      |   2 +-
 .../integration/kafka/api/SSLConsumerTest.scala |  31 +---
 .../kafka/consumer/PartitionAssignorTest.scala  |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  10 +-
 26 files changed, 498 insertions(+), 473 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 0387f26..3f3a5f5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -12,14 +12,15 @@
  */
 package org.apache.kafka.clients;
 
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
 /**
  * A class encapsulating some of the logic around metadata.
  * <p>
@@ -118,12 +119,14 @@ public final class Metadata {
     }
 
     /**
-     * Add one or more topics to maintain metadata for
+     * Replace the current set of topics maintained to the one provided
+     * @param topics
      */
-    public synchronized void addTopics(String... topics) {
-        for (String topic : topics)
-            this.topics.add(topic);
-        requestUpdate();
+    public synchronized void setTopics(Collection<String> topics) {
+        if (!this.topics.containsAll(topics))
+            requestUpdate();
+        this.topics.clear();
+        this.topics.addAll(topics);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 76834ad..509918c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -31,29 +31,29 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 public interface Consumer<K, V> extends Closeable {
     
     /**
-     * @see KafkaConsumer#subscriptions()
+     * @see KafkaConsumer#assignment()
      */
-    public Set<TopicPartition> subscriptions();
+    public Set<TopicPartition> assignment();
 
     /**
-     * @see KafkaConsumer#subscribe(String...)
+     * @see KafkaConsumer#subscription()
      */
-    public void subscribe(String... topics);
+    public Set<String> subscription();
 
     /**
-     * @see KafkaConsumer#subscribe(TopicPartition...)
+     * @see KafkaConsumer#subscribe(List)
      */
-    public void subscribe(TopicPartition... partitions);
+    public void subscribe(List<String> topics);
 
     /**
-     * @see KafkaConsumer#unsubscribe(String...)
+     * @see KafkaConsumer#subscribe(List, ConsumerRebalanceListener)
      */
-    public void unsubscribe(String... topics);
+    public void subscribe(List<String> topics, ConsumerRebalanceListener callback);
 
     /**
-     * @see KafkaConsumer#unsubscribe(TopicPartition...)
+     * @see KafkaConsumer#assign(List)
      */
-    public void unsubscribe(TopicPartition... partitions);
+    public void assign(List<TopicPartition> partitions);
 
     /**
      * @see KafkaConsumer#poll(long)

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 9c9510a..b9a2d4e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -13,7 +13,6 @@
 package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceCallback;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -144,12 +143,6 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
 
     /**
-     * <code>rebalance.callback.class</code>
-     */
-    public static final String CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG = "rebalance.callback.class";
-    private static final String CONSUMER_REBALANCE_CALLBACK_CLASS_DOC = "A user-provided callback to execute when partition assignments change.";
-
-    /**
      * <code>check.crcs</code>
      */
     public static final String CHECK_CRCS_CONFIG = "check.crcs";
@@ -259,11 +252,6 @@ public class ConsumerConfig extends AbstractConfig {
                                         in("latest", "earliest", "none"),
                                         Importance.MEDIUM,
                                         AUTO_OFFSET_RESET_DOC)
-                                .define(CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        NoOpConsumerRebalanceCallback.class,
-                                        Importance.LOW,
-                                        CONSUMER_REBALANCE_CALLBACK_CLASS_DOC)
                                 .define(CHECK_CRCS_CONFIG,
                                         Type.BOOLEAN,
                                         true,

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
deleted file mode 100644
index ff3f50f..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import java.util.Collection;
-
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the
- * consumer changes.
- * <p>
- * This is applicable when the consumer is having Kafka auto-manage group membership, if the consumer's directly subscribe to partitions
- * those partitions will never be reassigned and this callback is not applicable.
- * <p>
- * When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group changes or the subscription
- * of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure.
- * <p>
- * There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in
- * the {@link #onPartitionsRevoked(Consumer, Collection)} call we can ensure that any time partition assignment changes
- * the offset gets saved.
- * <p>
- * Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example,
- * consider a case where the consumer is subscribed to a topic containing user page views, and the goal is to count the
- * number of page views per users for each five minute window. Let's say the topic is partitioned by the user id so that
- * all events for a particular user will go to a single consumer instance. The consumer can keep in memory a running
- * tally of actions per user and only flush these out to a remote data store when it's cache gets to big. However if a
- * partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over
- * consumption.
- * <p>
- * This callback will execute in the user thread as part of the {@link Consumer#poll(long) poll(long)} call whenever partition assignment changes.
- * <p>
- * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} prior to 
- * any process invoking {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned}. So if offsets or other state is saved in the 
- * {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
- * partition has their {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned} callback called to load the state.
- * <p>
- * Here is pseudo-code for a callback implementation for saving offsets:
- * <pre>
- * {@code
- *   public class SaveOffsetsOnRebalance implements ConsumerRebalanceCallback {
- *       public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
- *           // read the offsets from an external store using some custom code not described here
- *           for(TopicPartition partition: partitions)
- *              consumer.position(partition, readOffsetFromExternalStore(partition));
- *       }      
- *       public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
- *           // save the offsets in an external store using some custom code not described here
- *           for(TopicPartition partition: partitions)
- *              saveOffsetInExternalStore(consumer.position(partition));
- *       }
- *   }
- * }
- * </pre>
- */
-public interface ConsumerRebalanceCallback {
-
-    /**
-     * A callback method the user can implement to provide handling of customized offsets on completion of a successful
-     * partition re-assignement. This method will be called after an offset re-assignement completes and before the
-     * consumer starts fetching data.
-     * <p>
-     * It is guaranteed that all the processes in a consumer group will execute their
-     * {@link #onPartitionsRevoked(Consumer, Collection)} callback before any instance executes its
-     * {@link #onPartitionsAssigned(Consumer, Collection)} callback.
-     *
-     * @param consumer Reference to the consumer for convenience
-     * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
-     *            assigned to the consumer)
-     */
-    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
-
-    /**
-     * A callback method the user can implement to provide handling of offset commits to a customized store on the start
-     * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer
-     * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a
-     * custom offset store to prevent duplicate data
-     * <p>
-     * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
-     *
-     * @param consumer  Reference to the consumer for convenience
-     * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
-     */
-    public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
new file mode 100644
index 0000000..2f2591c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Collection;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the
+ * consumer changes.
+ * <p>
+ * This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer's directly assign partitions,
+ * those partitions will never be reassigned and this callback is not applicable.
+ * <p>
+ * When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group changes or the subscription
+ * of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure.
+ * Rebalances can also be triggered by changes affecting the subscribed topics (e.g. when then number of partitions is
+ * administratively adjusted).
+ * <p>
+ * There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in
+ * the {@link #onPartitionsRevoked(Consumer, Collection)} call we can ensure that any time partition assignment changes
+ * the offset gets saved.
+ * <p>
+ * Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example,
+ * consider a case where the consumer is subscribed to a topic containing user page views, and the goal is to count the
+ * number of page views per users for each five minute window. Let's say the topic is partitioned by the user id so that
+ * all events for a particular user will go to a single consumer instance. The consumer can keep in memory a running
+ * tally of actions per user and only flush these out to a remote data store when it's cache gets to big. However if a
+ * partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over
+ * consumption.
+ * <p>
+ * This callback will execute in the user thread as part of the {@link Consumer#poll(long) poll(long)} call whenever partition assignment changes.
+ * <p>
+ * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} prior to 
+ * any process invoking {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned}. So if offsets or other state is saved in the 
+ * {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
+ * partition has their {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned} callback called to load the state.
+ * <p>
+ * Here is pseudo-code for a callback implementation for saving offsets:
+ * <pre>
+ * {@code
+ *   public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
+ *       public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+ *           // read the offsets from an external store using some custom code not described here
+ *           for(TopicPartition partition: partitions)
+ *              consumer.seek(partition, readOffsetFromExternalStore(partition));
+ *       }      
+ *       public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+ *           // save the offsets in an external store using some custom code not described here
+ *           for(TopicPartition partition: partitions)
+ *              saveOffsetInExternalStore(consumer.position(partition));
+ *       }
+ *   }
+ * }
+ * </pre>
+ */
+public interface ConsumerRebalanceListener {
+
+    /**
+     * A callback method the user can implement to provide handling of customized offsets on completion of a successful
+     * partition re-assignement. This method will be called after an offset re-assignement completes and before the
+     * consumer starts fetching data.
+     * <p>
+     * It is guaranteed that all the processes in a consumer group will execute their
+     * {@link #onPartitionsRevoked(Consumer, Collection)} callback before any instance executes its
+     * {@link #onPartitionsAssigned(Consumer, Collection)} callback.
+     *
+     * @param consumer Reference to the consumer for convenience
+     * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
+     *            assigned to the consumer)
+     */
+    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
+
+    /**
+     * A callback method the user can implement to provide handling of offset commits to a customized store on the start
+     * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer
+     * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a
+     * custom offset store to prevent duplicate data
+     * <p>
+     * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
+     *
+     * @param consumer  Reference to the consumer for convenience
+     * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
+     */
+    public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 8b54acd..938981c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Coordinator;
 import org.apache.kafka.clients.consumer.internals.DelayedTask;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -46,6 +47,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -229,8 +231,8 @@ import static org.apache.kafka.common.utils.Utils.min;
  *     String topic = &quot;foo&quot;;
  *     TopicPartition partition0 = new TopicPartition(topic, 0);
  *     TopicPartition partition1 = new TopicPartition(topic, 1);
- *     consumer.subscribe(partition0);
- *     consumer.subscribe(partition1);
+ *     consumer.assign(partition0);
+ *     consumer.assign(partition1);
  * </pre>
  *
  * The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only
@@ -272,13 +274,13 @@ import static org.apache.kafka.common.utils.Utils.min;
  * search index use case described above). If the partition assignment is done automatically special care will also be
  * needed to handle the case where partition assignments change. This can be handled using a special callback specified
  * using <code>rebalance.callback.class</code>, which specifies an implementation of the interface
- * {@link ConsumerRebalanceCallback}. When partitions are taken from a consumer the consumer will want to commit its
+ * {@link ConsumerRebalanceListener}. When partitions are taken from a consumer the consumer will want to commit its
  * offset for those partitions by implementing
- * {@link ConsumerRebalanceCallback#onPartitionsRevoked(Consumer, Collection)}. When partitions are assigned to a
+ * {@link ConsumerRebalanceListener#onPartitionsRevoked(Consumer, Collection)}. When partitions are assigned to a
  * consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer
- * to that position by implementing {@link ConsumerRebalanceCallback#onPartitionsAssigned(Consumer, Collection)}.
+ * to that position by implementing {@link ConsumerRebalanceListener#onPartitionsAssigned(Consumer, Collection)}.
  * <p>
- * Another common use for {@link ConsumerRebalanceCallback} is to flush any caches the application maintains for
+ * Another common use for {@link ConsumerRebalanceListener} is to flush any caches the application maintains for
  * partitions that are moved elsewhere.
  *
  * <h4>Controlling The Consumer's Position</h4>
@@ -437,29 +439,25 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @param configs The consumer configs
      */
     public KafkaConsumer(Map<String, Object> configs) {
-        this(configs, null, null, null);
+        this(configs, null, null);
     }
 
     /**
      * A consumer is instantiated by providing a set of key-value pairs as configuration, a
-     * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
+     * {@link ConsumerRebalanceListener} implementation, a key and a value {@link Deserializer}.
      * <p>
      * Valid configuration strings are documented at {@link ConsumerConfig}
      *
      * @param configs The consumer configs
-     * @param callback A callback interface that the user can implement to manage customized offsets on the start and
-     *            end of every rebalance operation.
      * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
      *            won't be called in the consumer when the deserializer is passed in directly.
      * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
      *            won't be called in the consumer when the deserializer is passed in directly.
      */
     public KafkaConsumer(Map<String, Object> configs,
-                         ConsumerRebalanceCallback callback,
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
         this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
-            callback,
             keyDeserializer,
             valueDeserializer);
     }
@@ -471,43 +469,35 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * {@link ConsumerConfig}
      */
     public KafkaConsumer(Properties properties) {
-        this(properties, null, null, null);
+        this(properties, null, null);
     }
 
     /**
      * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a
-     * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
+     * {@link ConsumerRebalanceListener} implementation, a key and a value {@link Deserializer}.
      * <p>
      * Valid configuration strings are documented at {@link ConsumerConfig}
      *
      * @param properties The consumer configuration properties
-     * @param callback A callback interface that the user can implement to manage customized offsets on the start and
-     *            end of every rebalance operation.
      * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
      *            won't be called in the consumer when the deserializer is passed in directly.
      * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
      *            won't be called in the consumer when the deserializer is passed in directly.
      */
     public KafkaConsumer(Properties properties,
-                         ConsumerRebalanceCallback callback,
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
         this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
-             callback,
              keyDeserializer,
              valueDeserializer);
     }
 
     @SuppressWarnings("unchecked")
     private KafkaConsumer(ConsumerConfig config,
-                          ConsumerRebalanceCallback callback,
                           Deserializer<K> keyDeserializer,
                           Deserializer<V> valueDeserializer) {
         try {
             log.debug("Starting the Kafka consumer");
-            if (callback == null)
-                callback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
-                        ConsumerRebalanceCallback.class);
             this.time = new SystemTime();
             this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
             this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
@@ -552,8 +542,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     metricsTags,
                     this.time,
                     requestTimeoutMs,
-                    retryBackoffMs,
-                    wrapRebalanceCallback(callback));
+                    retryBackoffMs);
             if (keyDeserializer == null) {
                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                         Deserializer.class);
@@ -600,23 +589,41 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * The set of partitions currently assigned to this consumer. If subscription happened by directly subscribing to
-     * partitions using {@link #subscribe(TopicPartition...)} then this will simply return the list of partitions that
-     * were subscribed to. If subscription was done by specifying only the topic using {@link #subscribe(String...)}
-     * then this will give the set of topics currently assigned to the consumer (which may be none if the assignment
-     * hasn't happened yet, or the partitions are in the process of getting reassigned).
+     * The set of partitions currently assigned to this consumer. If subscription happened by directly assigning
+     * partitions using {@link #assign(List)} then this will simply return the same partitions that
+     * were assigned. If topic subscription was used, then this will give the set of topic partitions currently assigned
+     * to the consumer (which may be none if the assignment hasn't happened yet, or the partitions are in the
+     * process of getting reassigned).
+     * @return The set of partitions currently assigned to this consumer
      */
-    public Set<TopicPartition> subscriptions() {
+    public Set<TopicPartition> assignment() {
         acquire();
         try {
-            return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
+            return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.assignedPartitions()));
         } finally {
             release();
         }
     }
 
     /**
-     * Incrementally subscribes to the given list of topics and uses the consumer's group management functionality
+     * Get the current subscription. Will return the same topics used in the most recent call to
+     * {@link #subscribe(List, ConsumerRebalanceListener)}, or an empty set if no such call has been made.
+     * @return The set of topics currently subscribed to
+     */
+    public Set<String> subscription() {
+        acquire();
+        try {
+            return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.subscription()));
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Subscribe to the given list of topics and use the consumer's group management functionality to
+     * assign partitions. Topic subscriptions are not incremental. This list will replace the current
+     * assignment (if there is one). Note that it is not possible to combine topic subscription with group management
+     * with manual partition assignment through {@link #assign(List)}.
      * <p>
      * As part of group management, the consumer will keep track of the list of consumers that belong to a particular
      * group and will trigger a rebalance operation if one of the following events trigger -
@@ -626,82 +633,76 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * <li>An existing member of the consumer group dies
      * <li>A new member is added to an existing consumer group via the join API
      * </ul>
+     * <p>
+     * When any of these events are triggered, the provided listener will be invoked first to indicate that
+     * the consumer's assignment has been revoked, and then again when the new assignment has been received.
+     * Note that this listener will immediately override any listener set in a previous call to subscribe.
+     * It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics
+     * subscribed in this call. See {@link ConsumerRebalanceListener} for more details.
      *
-     * @param topics A variable list of topics that the consumer wants to subscribe to
+     * @param topics The list of topics to subscribe to
+     * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
+     *                 subscribed topics
      */
     @Override
-    public void subscribe(String... topics) {
+    public void subscribe(List<String> topics, ConsumerRebalanceListener listener) {
         acquire();
         try {
             log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
-            for (String topic : topics)
-                this.subscriptions.subscribe(topic);
-            metadata.addTopics(topics);
+            this.subscriptions.subscribe(topics, SubscriptionState.wrapListener(this, listener));
+            metadata.setTopics(topics);
         } finally {
             release();
         }
     }
 
     /**
-     * Incrementally subscribes to a specific topic partition and does not use the consumer's group management
-     * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
-     * metadata change.
+     * Subscribe to the given list of topics and use the consumer's group management functionality to
+     * assign partitions. Topic subscriptions are not incremental. This list will replace the current
+     * assignment (if there is one). It is not possible to combine topic subscription with group management
+     * with manual partition assignment through {@link #assign(List)}.
      * <p>
+     * This is a short-hand for {@link #subscribe(List, ConsumerRebalanceListener)}, which
+     * uses a noop listener. If you need the ability to either seek to particular offsets, you should prefer
+     * {@link #subscribe(List, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
+     * to be reset. You should also prefer to provide your own listener if you are doing your own offset
+     * management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
      *
-     * @param partitions Partitions to incrementally subscribe to
+     * @param topics The list of topics to subscribe to
      */
     @Override
-    public void subscribe(TopicPartition... partitions) {
-        acquire();
-        try {
-            log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", "));
-            for (TopicPartition tp : partitions) {
-                this.subscriptions.subscribe(tp);
-                metadata.addTopics(tp.topic());
-            }
-        } finally {
-            release();
-        }
-    }
-
-    /**
-     * Unsubscribe from the specific topics. This will trigger a rebalance operation and records for this topic will not
-     * be returned from the next {@link #poll(long) poll()} onwards
-     *
-     * @param topics Topics to unsubscribe from
-     */
-    public void unsubscribe(String... topics) {
-        acquire();
-        try {
-            log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", "));
-            // throw an exception if the topic was never subscribed to
-            for (String topic : topics)
-                this.subscriptions.unsubscribe(topic);
-        } finally {
-            release();
-        }
+    public void subscribe(List<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
     }
 
     /**
-     * Unsubscribe from the specific topic partitions. records for these partitions will not be returned from the next
-     * {@link #poll(long) poll()} onwards
+     * Assign a list of partition to this consumer. This interface does not allow for incremental assignment
+     * and will replace the previous assignment (if there is one).
+     * <p>
+     * Manual topic assignment through this method does not use the consumer's group management
+     * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
+     * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(List)}
+     * and group assignment with {@link #subscribe(List, ConsumerRebalanceListener)}.
      *
-     * @param partitions Partitions to unsubscribe from
+     * @param partitions The list of partitions to assign this consumer
      */
-    public void unsubscribe(TopicPartition... partitions) {
+    @Override
+    public void assign(List<TopicPartition> partitions) {
         acquire();
         try {
-            log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", "));
-            // throw an exception if the partition was never subscribed to
-            for (TopicPartition partition : partitions)
-                this.subscriptions.unsubscribe(partition);
+            log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
+            this.subscriptions.assign(partitions);
+            Set<String> topics = new HashSet<>();
+            for (TopicPartition tp : partitions)
+                topics.add(tp.topic());
+            metadata.setTopics(topics);
         } finally {
             release();
         }
     }
 
     /**
-     * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have
+     * Fetches data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
      * subscribed to any topics or partitions before polling for data.
      * <p>
      * The offset used for fetching the data is governed by whether or not {@link #seek(TopicPartition, long)} is used.
@@ -736,7 +737,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     // handling the fetched records.
                     fetcher.initFetches(metadata.fetch());
                     client.poll(0);
-                    return new ConsumerRecords<K, V>(records);
+                    return new ConsumerRecords<>(records);
                 }
 
                 remaining -= end - start;
@@ -1117,20 +1118,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
-    private Coordinator.RebalanceCallback wrapRebalanceCallback(final ConsumerRebalanceCallback callback) {
-        return new Coordinator.RebalanceCallback() {
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                callback.onPartitionsAssigned(KafkaConsumer.this, partitions);
-            }
-
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                callback.onPartitionsRevoked(KafkaConsumer.this, partitions);
-            }
-        };
-    }
-
     /**
      * Set the fetch position to the committed position (if there is one)
      * or reset it using the offset reset policy the user has configured.

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index b07e760..e33f120 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -12,6 +12,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -48,34 +49,30 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
     
     @Override
-    public synchronized Set<TopicPartition> subscriptions() {
+    public synchronized Set<TopicPartition> assignment() {
         return this.subscriptions.assignedPartitions();
     }
 
     @Override
-    public synchronized void subscribe(String... topics) {
-        ensureNotClosed();
-        for (String topic : topics)
-            this.subscriptions.subscribe(topic);
+    public synchronized Set<String> subscription() {
+        return this.subscriptions.subscription();
     }
 
     @Override
-    public synchronized void subscribe(TopicPartition... partitions) {
-        ensureNotClosed();
-        for (TopicPartition partition : partitions)
-            this.subscriptions.subscribe(partition);
+    public synchronized void subscribe(List<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
     }
 
-    public synchronized void unsubscribe(String... topics) {
+    @Override
+    public synchronized void subscribe(List<String> topics, final ConsumerRebalanceListener listener) {
         ensureNotClosed();
-        for (String topic : topics)
-            this.subscriptions.unsubscribe(topic);
+        this.subscriptions.subscribe(topics, SubscriptionState.wrapListener(this, listener));
     }
 
-    public synchronized void unsubscribe(TopicPartition... partitions) {
+    @Override
+    public synchronized void assign(List<TopicPartition> partitions) {
         ensureNotClosed();
-        for (TopicPartition partition : partitions)
-            this.subscriptions.unsubscribe(partition);
+        this.subscriptions.assign(partitions);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 70442aa..b804796 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -46,7 +46,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -72,7 +71,6 @@ public final class Coordinator {
     private final CoordinatorMetrics sensors;
     private final long requestTimeoutMs;
     private final long retryBackoffMs;
-    private final RebalanceCallback rebalanceCallback;
     private Node consumerCoordinator;
     private String consumerId;
     private int generation;
@@ -92,8 +90,7 @@ public final class Coordinator {
                        Map<String, String> metricTags,
                        Time time,
                        long requestTimeoutMs,
-                       long retryBackoffMs,
-                       RebalanceCallback rebalanceCallback) {
+                       long retryBackoffMs) {
 
         this.client = client;
         this.time = time;
@@ -109,7 +106,6 @@ public final class Coordinator {
         this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
         this.requestTimeoutMs = requestTimeoutMs;
         this.retryBackoffMs = retryBackoffMs;
-        this.rebalanceCallback = rebalanceCallback;
     }
 
     /**
@@ -159,25 +155,27 @@ public final class Coordinator {
         if (!subscriptions.partitionAssignmentNeeded())
             return;
 
-        // execute the user's callback before rebalance
+        SubscriptionState.RebalanceListener listener = subscriptions.listener();
+
+        // execute the user's listener before rebalance
         log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
         try {
-            Set<TopicPartition> revoked = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
-            rebalanceCallback.onPartitionsRevoked(revoked);
+            Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
+            listener.onPartitionsRevoked(revoked);
         } catch (Exception e) {
-            log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+            log.error("User provided listener " + listener.underlying().getClass().getName()
                     + " failed on partition revocation: ", e);
         }
 
         reassignPartitions();
 
-        // execute the user's callback after rebalance
+        // execute the user's listener after rebalance
         log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
         try {
-            Set<TopicPartition> assigned = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
-            rebalanceCallback.onPartitionsAssigned(assigned);
+            Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
+            listener.onPartitionsAssigned(assigned);
         } catch (Exception e) {
-            log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+            log.error("User provided listener " + listener.underlying().getClass().getName()
                     + " failed on partition assignment: ", e);
         }
     }
@@ -293,7 +291,7 @@ public final class Coordinator {
             return RequestFuture.coordinatorNotAvailable();
 
         // send a join group request to the coordinator
-        List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscribedTopics());
+        List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscription());
         log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics);
 
         JoinGroupRequest request = new JoinGroupRequest(groupId,
@@ -713,10 +711,6 @@ public final class Coordinator {
         }
     }
 
-    public interface RebalanceCallback {
-        void onPartitionsAssigned(Collection<TopicPartition> partitions);
-        void onPartitionsRevoked(Collection<TopicPartition> partitions);
-    }
 
     private class CoordinatorMetrics {
         public final Metrics metrics;

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
deleted file mode 100644
index c06ab3a..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.clients.consumer.internals;
-
-import java.util.Collection;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback;
-import org.apache.kafka.common.TopicPartition;
-
-public class NoOpConsumerRebalanceCallback implements ConsumerRebalanceCallback {
-
-    @Override
-    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
-
-    @Override
-    public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
new file mode 100644
index 0000000..d88d71c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.Collection;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+
+public class NoOpConsumerRebalanceListener implements ConsumerRebalanceListener {
+
+    @Override
+    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
+
+    @Override
+    public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6788ee6..ec6b424 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -12,19 +12,22 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 /**
  * A class for tracking the topics, partitions, and offsets for the consumer. A partition
- * is "assigned" either directly with {@link #subscribe(TopicPartition)} (manual assignment)
+ * is "assigned" either directly with {@link #assign(List)} (manual assignment)
  * or with {@link #changePartitionAssignment(List)} (automatic assignment).
  *
  * Once assigned, the partition is not considered "fetchable" until its initial position has
@@ -36,8 +39,7 @@ import java.util.Set;
  * used. You can also query the pause state independently with {@link #isPaused(TopicPartition)}.
  *
  * Note that pause state as well as fetch/consumed positions are not preserved when partition
- * assignment is changed either with {@link #unsubscribe(TopicPartition)} or
- * {@link #changePartitionAssignment(List)}.
+ * assignment is changed whether directly by the user or through a group rebalance.
  *
  * This class also maintains a cache of the latest commit position for each of the assigned
  * partitions. This is updated through {@link #committed(TopicPartition, long)} and can be used
@@ -46,13 +48,13 @@ import java.util.Set;
 public class SubscriptionState {
 
     /* the list of topics the user has requested */
-    private final Set<String> subscribedTopics;
+    private final Set<String> subscription;
 
     /* the list of partitions the user has requested */
-    private final Set<TopicPartition> subscribedPartitions;
+    private final Set<TopicPartition> userAssignment;
 
     /* the list of partitions currently assigned */
-    private final Map<TopicPartition, TopicPartitionState> assignedPartitions;
+    private final Map<TopicPartition, TopicPartitionState> assignment;
 
     /* do we need to request a partition assignment from the coordinator? */
     private boolean needsPartitionAssignment;
@@ -63,64 +65,66 @@ public class SubscriptionState {
     /* Default offset reset strategy */
     private final OffsetResetStrategy defaultResetStrategy;
 
+    /* Listener to be invoked when assignment changes */
+    private RebalanceListener listener;
+
     public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
         this.defaultResetStrategy = defaultResetStrategy;
-        this.subscribedTopics = new HashSet<>();
-        this.subscribedPartitions = new HashSet<>();
-        this.assignedPartitions = new HashMap<>();
+        this.subscription = new HashSet<>();
+        this.userAssignment = new HashSet<>();
+        this.assignment = new HashMap<>();
         this.needsPartitionAssignment = false;
         this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
     }
 
-    public void subscribe(String topic) {
-        if (!this.subscribedPartitions.isEmpty())
+    public void subscribe(List<String> topics, RebalanceListener listener) {
+        if (listener == null)
+            throw new IllegalArgumentException("RebalanceListener cannot be null");
+
+        if (!this.userAssignment.isEmpty())
             throw new IllegalStateException("Subscription to topics and partitions are mutually exclusive");
-        if (!this.subscribedTopics.contains(topic)) {
-            this.subscribedTopics.add(topic);
+
+        this.listener = listener;
+
+        if (!this.subscription.equals(new HashSet<>(topics))) {
+            this.subscription.clear();
+            this.subscription.addAll(topics);
             this.needsPartitionAssignment = true;
-        }
-    }
 
-    public void unsubscribe(String topic) {
-        if (!this.subscribedTopics.contains(topic))
-            throw new IllegalStateException("Topic " + topic + " was never subscribed to.");
-        this.subscribedTopics.remove(topic);
-        this.needsPartitionAssignment = true;
-        final List<TopicPartition> existingAssignedPartitions = new ArrayList<>(assignedPartitions());
-        for (TopicPartition tp: existingAssignedPartitions)
-            if (topic.equals(tp.topic()))
-                clearPartition(tp);
+            // Remove any assigned partitions which are no longer subscribed to
+            for (Iterator<TopicPartition> it = assignment.keySet().iterator(); it.hasNext(); ) {
+                TopicPartition tp = it.next();
+                if (!subscription.contains(tp.topic()))
+                    it.remove();
+            }
+        }
     }
 
     public void needReassignment() {
         this.needsPartitionAssignment = true;
     }
 
-    public void subscribe(TopicPartition tp) {
-        if (!this.subscribedTopics.isEmpty())
+    public void assign(List<TopicPartition> partitions) {
+        if (!this.subscription.isEmpty())
             throw new IllegalStateException("Subscription to topics and partitions are mutually exclusive");
-        this.subscribedPartitions.add(tp);
-        addAssignedPartition(tp);
-    }
 
-    public void unsubscribe(TopicPartition partition) {
-        if (!subscribedPartitions.contains(partition))
-            throw new IllegalStateException("Partition " + partition + " was never subscribed to.");
-        subscribedPartitions.remove(partition);
-        clearPartition(partition);
-    }
-    
-    private void clearPartition(TopicPartition tp) {
-        this.assignedPartitions.remove(tp);
+        this.userAssignment.clear();
+        this.userAssignment.addAll(partitions);
+
+        for (TopicPartition partition : partitions)
+            if (!assignment.containsKey(partition))
+                addAssignedPartition(partition);
+
+        this.assignment.keySet().retainAll(this.userAssignment);
     }
 
     public void clearAssignment() {
-        this.assignedPartitions.clear();
-        this.needsPartitionAssignment = !subscribedTopics().isEmpty();
+        this.assignment.clear();
+        this.needsPartitionAssignment = !subscription().isEmpty();
     }
 
-    public Set<String> subscribedTopics() {
-        return this.subscribedTopics;
+    public Set<String> subscription() {
+        return this.subscription;
     }
 
     public Long fetched(TopicPartition tp) {
@@ -132,7 +136,7 @@ public class SubscriptionState {
     }
 
     private TopicPartitionState assignedState(TopicPartition tp) {
-        TopicPartitionState state = this.assignedPartitions.get(tp);
+        TopicPartitionState state = this.assignment.get(tp);
         if (state == null)
             throw new IllegalStateException("No current assignment for partition " + tp);
         return state;
@@ -163,12 +167,12 @@ public class SubscriptionState {
     }
 
     public Set<TopicPartition> assignedPartitions() {
-        return this.assignedPartitions.keySet();
+        return this.assignment.keySet();
     }
 
     public Set<TopicPartition> fetchablePartitions() {
         Set<TopicPartition> fetchable = new HashSet<>();
-        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet()) {
+        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
             if (entry.getValue().isFetchable())
                 fetchable.add(entry.getKey());
         }
@@ -176,7 +180,7 @@ public class SubscriptionState {
     }
 
     public boolean partitionsAutoAssigned() {
-        return !this.subscribedTopics.isEmpty();
+        return !this.subscription.isEmpty();
     }
 
     public void consumed(TopicPartition tp, long offset) {
@@ -189,7 +193,7 @@ public class SubscriptionState {
 
     public Map<TopicPartition, Long> allConsumed() {
         Map<TopicPartition, Long> allConsumed = new HashMap<>();
-        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet()) {
+        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
             TopicPartitionState state = entry.getValue();
             if (state.hasValidPosition)
                 allConsumed.put(entry.getKey(), state.consumed);
@@ -214,15 +218,15 @@ public class SubscriptionState {
     }
 
     public boolean hasAllFetchPositions() {
-        for (TopicPartitionState state : assignedPartitions.values())
+        for (TopicPartitionState state : assignment.values())
             if (!state.hasValidPosition)
                 return false;
         return true;
     }
 
     public Set<TopicPartition> missingFetchPositions() {
-        Set<TopicPartition> missing = new HashSet<>(this.assignedPartitions.keySet());
-        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet())
+        Set<TopicPartition> missing = new HashSet<>(this.assignment.keySet());
+        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet())
             if (!entry.getValue().hasValidPosition)
                 missing.add(entry.getKey());
         return missing;
@@ -234,7 +238,7 @@ public class SubscriptionState {
 
     public void changePartitionAssignment(List<TopicPartition> assignments) {
         for (TopicPartition tp : assignments)
-            if (!this.subscribedTopics.contains(tp.topic()))
+            if (!this.subscription.contains(tp.topic()))
                 throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
         this.clearAssignment();
         for (TopicPartition tp: assignments)
@@ -243,7 +247,7 @@ public class SubscriptionState {
     }
 
     public boolean isAssigned(TopicPartition tp) {
-        return assignedPartitions.containsKey(tp);
+        return assignment.containsKey(tp);
     }
 
     public boolean isPaused(TopicPartition tp) {
@@ -263,7 +267,11 @@ public class SubscriptionState {
     }
 
     private void addAssignedPartition(TopicPartition tp) {
-        this.assignedPartitions.put(tp, new TopicPartitionState());
+        this.assignment.put(tp, new TopicPartitionState());
+    }
+
+    public RebalanceListener listener() {
+        return listener;
     }
 
     private static class TopicPartitionState {
@@ -332,4 +340,46 @@ public class SubscriptionState {
 
     }
 
+    public static RebalanceListener wrapListener(final Consumer<?, ?> consumer,
+                                                 final ConsumerRebalanceListener listener) {
+        if (listener == null)
+            throw new IllegalArgumentException("ConsumerRebalanceLister must not be null");
+
+        return new RebalanceListener() {
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+                listener.onPartitionsAssigned(consumer, partitions);
+            }
+
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+                listener.onPartitionsRevoked(consumer, partitions);
+            }
+
+            @Override
+            public ConsumerRebalanceListener underlying() {
+                return listener;
+            }
+        };
+    }
+
+    /**
+     * Wrapper around {@link ConsumerRebalanceListener} to get around the need to provide a reference
+     * to the consumer in this class.
+     */
+    public static class RebalanceListener {
+        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+
+        }
+
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+
+        }
+
+        public ConsumerRebalanceListener underlying() {
+            return null;
+        }
+    }
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 738f3ed..7625218 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -37,7 +37,7 @@ public class KafkaConsumerTest {
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
         try {
             KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
-                    props, null, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+                    props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
         } catch (KafkaException e) {
             Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
             Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index d4da642..6e6a118 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -16,12 +16,15 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import static org.junit.Assert.*;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 public class MockConsumerTest {
     
@@ -29,7 +32,21 @@ public class MockConsumerTest {
 
     @Test
     public void testSimpleMock() {
-        consumer.subscribe("test");
+        consumer.subscribe(Arrays.asList("test"), new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+
+            }
+
+            @Override
+            public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+
+            }
+        });
+
+
+
+
         assertEquals(0, consumer.poll(1000).count());
         ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, "key1", "value1");
         ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, "key2", "value2");

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index a23b8e7..3452639 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -72,7 +73,7 @@ public class CoordinatorTest {
     private Metrics metrics;
     private Map<String, String> metricTags = new LinkedHashMap<String, String>();
     private ConsumerNetworkClient consumerClient;
-    private MockRebalanceCallback rebalanceCallback;
+    private MockSubscriptionListener subscriptionListener;
     private Coordinator coordinator;
 
     @Before
@@ -83,7 +84,7 @@ public class CoordinatorTest {
         this.metadata = new Metadata(0, Long.MAX_VALUE);
         this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
         this.metrics = new Metrics(time);
-        this.rebalanceCallback = new MockRebalanceCallback();
+        this.subscriptionListener = new MockSubscriptionListener();
 
         client.setNode(node);
 
@@ -98,8 +99,7 @@ public class CoordinatorTest {
                 metricTags,
                 time,
                 requestTimeoutMs,
-                retryBackoffMs,
-                rebalanceCallback);
+                retryBackoffMs);
     }
 
     @Test
@@ -168,7 +168,7 @@ public class CoordinatorTest {
         coordinator.ensureCoordinatorKnown();
 
         // illegal_generation will cause re-partition
-        subscriptions.subscribe(topicName);
+        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
         subscriptions.changePartitionAssignment(Collections.singletonList(tp));
 
         time.sleep(sessionTimeoutMs);
@@ -192,7 +192,7 @@ public class CoordinatorTest {
         coordinator.ensureCoordinatorKnown();
 
         // illegal_generation will cause re-partition
-        subscriptions.subscribe(topicName);
+        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
         subscriptions.changePartitionAssignment(Collections.singletonList(tp));
 
         time.sleep(sessionTimeoutMs);
@@ -233,7 +233,7 @@ public class CoordinatorTest {
 
     @Test
     public void testNormalJoinGroup() {
-        subscriptions.subscribe(topicName);
+        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
@@ -245,15 +245,15 @@ public class CoordinatorTest {
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
-        assertEquals(1, rebalanceCallback.revokedCount);
-        assertEquals(Collections.emptySet(), rebalanceCallback.revoked);
-        assertEquals(1, rebalanceCallback.assignedCount);
-        assertEquals(Collections.singleton(tp), rebalanceCallback.assigned);
+        assertEquals(1, subscriptionListener.revokedCount);
+        assertEquals(Collections.emptySet(), subscriptionListener.revoked);
+        assertEquals(1, subscriptionListener.assignedCount);
+        assertEquals(Collections.singleton(tp), subscriptionListener.assigned);
     }
 
     @Test
     public void testReJoinGroup() {
-        subscriptions.subscribe(topicName);
+        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
@@ -266,15 +266,15 @@ public class CoordinatorTest {
         coordinator.ensurePartitionAssignment();
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
-        assertEquals(1, rebalanceCallback.revokedCount);
-        assertEquals(Collections.emptySet(), rebalanceCallback.revoked);
-        assertEquals(1, rebalanceCallback.assignedCount);
-        assertEquals(Collections.singleton(tp), rebalanceCallback.assigned);
+        assertEquals(1, subscriptionListener.revokedCount);
+        assertEquals(Collections.emptySet(), subscriptionListener.revoked);
+        assertEquals(1, subscriptionListener.assignedCount);
+        assertEquals(Collections.singleton(tp), subscriptionListener.assigned);
     }
 
     @Test(expected = ApiException.class)
     public void testUnknownPartitionAssignmentStrategy() {
-        subscriptions.subscribe(topicName);
+        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
@@ -287,7 +287,7 @@ public class CoordinatorTest {
 
     @Test(expected = ApiException.class)
     public void testInvalidSessionTimeout() {
-        subscriptions.subscribe(topicName);
+        subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
@@ -431,7 +431,7 @@ public class CoordinatorTest {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
 
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
@@ -444,7 +444,7 @@ public class CoordinatorTest {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
 
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
@@ -458,7 +458,7 @@ public class CoordinatorTest {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
 
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code(), "", 100L));
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
@@ -473,7 +473,7 @@ public class CoordinatorTest {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();
 
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
         coordinator.refreshCommittedOffsetsIfNeeded();
@@ -528,7 +528,7 @@ public class CoordinatorTest {
         }
     }
 
-    private static class MockRebalanceCallback implements Coordinator.RebalanceCallback {
+    private static class MockSubscriptionListener extends SubscriptionState.RebalanceListener {
         public Collection<TopicPartition> revoked;
         public Collection<TopicPartition> assigned;
         public int revokedCount = 0;
@@ -546,5 +546,6 @@ public class CoordinatorTest {
             this.revoked = partitions;
             revokedCount++;
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 22712bb..f2a8381 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -56,6 +56,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class FetcherTest {
+    private SubscriptionState.RebalanceListener listener = new SubscriptionState.RebalanceListener();
     private String topicName = "test";
     private String groupId = "test-group";
     private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics";
@@ -107,7 +108,7 @@ public class FetcherTest {
     @Test
     public void testFetchNormal() {
         List<ConsumerRecord<byte[], byte[]>> records;
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         // normal fetch
@@ -127,7 +128,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchDuringRebalance() {
-        subscriptions.subscribe(topicName);
+        subscriptions.subscribe(Arrays.asList(topicName), listener);
         subscriptions.changePartitionAssignment(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
@@ -144,7 +145,7 @@ public class FetcherTest {
 
     @Test
     public void testInFlightFetchOnPausedPartition() {
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
@@ -157,7 +158,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchOnPausedPartition() {
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         subscriptions.pause(tp);
@@ -167,7 +168,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchNotLeaderForPartition() {
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
@@ -179,7 +180,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchUnknownTopicOrPartition() {
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
@@ -191,7 +192,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchOffsetOutOfRange() {
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
@@ -205,7 +206,7 @@ public class FetcherTest {
 
     @Test
     public void testFetchDisconnected() {
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
@@ -224,7 +225,7 @@ public class FetcherTest {
     public void testUpdateFetchPositionToCommitted() {
         // unless a specific reset is expected, the default behavior is to reset to the committed
         // position if one is present
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.committed(tp, 5);
 
         fetcher.updateFetchPositions(Collections.singleton(tp));
@@ -235,7 +236,7 @@ public class FetcherTest {
 
     @Test
     public void testUpdateFetchPositionResetToDefaultOffset() {
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         // with no commit position, we should reset using the default strategy defined above (EARLIEST)
 
         client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
@@ -249,7 +250,7 @@ public class FetcherTest {
 
     @Test
     public void testUpdateFetchPositionResetToLatestOffset() {
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
 
         client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
@@ -263,7 +264,7 @@ public class FetcherTest {
 
     @Test
     public void testUpdateFetchPositionResetToEarliestOffset() {
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
 
         client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
@@ -277,7 +278,7 @@ public class FetcherTest {
 
     @Test
     public void testUpdateFetchPositionDisconnect() {
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
 
         // First request gets a disconnect
@@ -311,7 +312,7 @@ public class FetcherTest {
     @Test
     public void testQuotaMetrics() throws Exception {
         List<ConsumerRecord<byte[], byte[]>> records;
-        subscriptions.subscribe(tp);
+        subscriptions.assign(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
         // normal fetch

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 1ba6f7a..e4830b1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static java.util.Arrays.asList;
 
+import java.util.Arrays;
 import java.util.Collections;
 
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -28,20 +29,21 @@ import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 
 public class SubscriptionStateTest {
-    
+
+    private SubscriptionState.RebalanceListener listener = new SubscriptionState.RebalanceListener();
     private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST);
     private final TopicPartition tp0 = new TopicPartition("test", 0);
     private final TopicPartition tp1 = new TopicPartition("test", 1);
 
     @Test
-    public void partitionSubscription() {      
-        state.subscribe(tp0);
+    public void partitionAssignment() {
+        state.assign(Arrays.asList(tp0));
         assertEquals(Collections.singleton(tp0), state.assignedPartitions());
         state.committed(tp0, 1);
         state.seek(tp0, 1);
         assertTrue(state.isFetchable(tp0));
         assertAllPositions(tp0, 1L);
-        state.unsubscribe(tp0);
+        state.assign(Arrays.<TopicPartition>asList());
         assertTrue(state.assignedPartitions().isEmpty());
         assertFalse(state.isAssigned(tp0));
         assertFalse(state.isFetchable(tp0));
@@ -49,7 +51,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void partitionReset() {
-        state.subscribe(tp0);
+        state.assign(Arrays.asList(tp0));
         state.seek(tp0, 5);
         assertEquals(5L, (long) state.fetched(tp0));
         assertEquals(5L, (long) state.consumed(tp0));
@@ -67,8 +69,8 @@ public class SubscriptionStateTest {
 
     @Test
     public void topicSubscription() {
-        state.subscribe("test");
-        assertEquals(1, state.subscribedTopics().size());
+        state.subscribe(Arrays.asList("test"), listener);
+        assertEquals(1, state.subscription().size());
         assertTrue(state.assignedPartitions().isEmpty());
         assertTrue(state.partitionsAutoAssigned());
         state.changePartitionAssignment(asList(tp0));
@@ -84,7 +86,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void partitionPause() {
-        state.subscribe(tp0);
+        state.assign(Arrays.asList(tp0));
         state.seek(tp0, 100);
         assertTrue(state.isFetchable(tp0));
         state.pause(tp0);
@@ -96,8 +98,8 @@ public class SubscriptionStateTest {
     @Test
     public void topicUnsubscription() {
         final String topic = "test";
-        state.subscribe(topic);
-        assertEquals(1, state.subscribedTopics().size());
+        state.subscribe(Arrays.asList(topic), listener);
+        assertEquals(1, state.subscription().size());
         assertTrue(state.assignedPartitions().isEmpty());
         assertTrue(state.partitionsAutoAssigned());
         state.changePartitionAssignment(asList(tp0));
@@ -108,21 +110,21 @@ public class SubscriptionStateTest {
         assertFalse(state.isAssigned(tp0));
         assertEquals(Collections.singleton(tp1), state.assignedPartitions());
 
-        state.unsubscribe(topic);
-        assertEquals(0, state.subscribedTopics().size());
+        state.subscribe(Arrays.<String>asList(), listener);
+        assertEquals(0, state.subscription().size());
         assertTrue(state.assignedPartitions().isEmpty());
     }
 
     @Test(expected = IllegalStateException.class)
     public void invalidConsumedPositionUpdate() {
-        state.subscribe("test");
+        state.subscribe(Arrays.asList("test"), listener);
         state.changePartitionAssignment(asList(tp0));
         state.consumed(tp0, 0);
     }
 
     @Test(expected = IllegalStateException.class)
     public void invalidFetchPositionUpdate() {
-        state.subscribe("test");
+        state.subscribe(Arrays.asList("test"), listener);
         state.changePartitionAssignment(asList(tp0));
         state.fetched(tp0, 0);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index 4eaf756..272f62a 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -120,7 +120,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
      **/
     public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) {
         HashMap<TopicPartition, Long> offsets = new HashMap<>();
-        for (TopicPartition tp : consumer.subscriptions()) {
+        for (TopicPartition tp : consumer.assignment()) {
             offsets.put(tp, consumer.position(tp));
         }
         // We only don't flush the task in one case: when shutting down, the task has already been
@@ -179,7 +179,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
         }
 
         log.debug("Task {} subscribing to topics {}", id, topics);
-        newConsumer.subscribe(topics);
+        newConsumer.subscribe(Arrays.asList(topics));
 
         // Seek to any user-provided offsets. This is useful if offsets are tracked in the downstream system (e.g., to
         // enable exactly once delivery to that system).
@@ -188,7 +188,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
         // We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly.
         newConsumer.poll(0);
         Map<TopicPartition, Long> offsets = context.getOffsets();
-        for (TopicPartition tp : newConsumer.subscriptions()) {
+        for (TopicPartition tp : newConsumer.assignment()) {
             Long offset = offsets.get(tp);
             if (offset != null)
                 newConsumer.seek(tp, offset);

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 0c6f950..e5286e3 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -330,7 +330,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
             throws Exception {
         final long finalOffset = FIRST_OFFSET + expectedMessages - 1;
 
-        EasyMock.expect(consumer.subscriptions()).andReturn(Collections.singleton(TOPIC_PARTITION));
+        EasyMock.expect(consumer.assignment()).andReturn(Collections.singleton(TOPIC_PARTITION));
         EasyMock.expect(consumer.position(TOPIC_PARTITION)).andAnswer(
                 new IAnswer<Long>() {
                     @Override