You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/09/21 22:10:49 UTC

kafka git commit: KAFKA-2532; Remove Consumer reference from rebalance callback

Repository: kafka
Updated Branches:
  refs/heads/trunk e18f6860c -> 6ec88f7f8


KAFKA-2532; Remove Consumer reference from rebalance callback

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ewen Cheslack-Postava, Onur Karaman, Guozhang Wang

Closes #203 from hachikuji/KAFKA-2532


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6ec88f7f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6ec88f7f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6ec88f7f

Branch: refs/heads/trunk
Commit: 6ec88f7f8a18be7aff12fb0c73fc8abd00ef62d2
Parents: e18f686
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Sep 21 13:14:17 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Sep 21 13:14:17 2015 -0700

----------------------------------------------------------------------
 .../consumer/ConsumerRebalanceListener.java     | 32 ++++++------
 .../kafka/clients/consumer/KafkaConsumer.java   | 17 +++----
 .../kafka/clients/consumer/MockConsumer.java    |  4 +-
 .../clients/consumer/internals/Coordinator.java |  7 +--
 .../NoOpConsumerRebalanceListener.java          |  9 ++--
 .../consumer/internals/SubscriptionState.java   | 53 ++------------------
 .../clients/consumer/MockConsumerTest.java      | 18 +------
 .../consumer/internals/CoordinatorTest.java     |  7 +--
 .../clients/consumer/internals/FetcherTest.java |  3 +-
 .../internals/SubscriptionStateTest.java        | 37 +++++++-------
 .../scala/kafka/tools/ConsumerPerformance.scala |  4 +-
 .../integration/kafka/api/ConsumerTest.scala    |  4 +-
 12 files changed, 71 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
index 68939cb..671b6f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.TopicPartition;
  * administratively adjusted).
  * <p>
  * There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in
- * the {@link #onPartitionsRevoked(Consumer, Collection)} call we can ensure that any time partition assignment changes
+ * the {@link #onPartitionsRevoked(Collection)}, call we can ensure that any time partition assignment changes
  * the offset gets saved.
  * <p>
  * Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example,
@@ -42,21 +42,27 @@ import org.apache.kafka.common.TopicPartition;
  * <p>
  * This callback will execute in the user thread as part of the {@link Consumer#poll(long) poll(long)} call whenever partition assignment changes.
  * <p>
- * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} prior to 
- * any process invoking {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned}. So if offsets or other state is saved in the 
- * {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
- * partition has their {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned} callback called to load the state.
+ * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} prior to
+ * any process invoking {@link #onPartitionsAssigned(Collection) onPartitionsAssigned}. So if offsets or other state is saved in the
+ * {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
+ * partition has their {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} callback called to load the state.
  * <p>
  * Here is pseudo-code for a callback implementation for saving offsets:
  * <pre>
  * {@code
  *   public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
- *       public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+ *       private Consumer<?,?> consumer;
+ *
+ *       public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
+ *           this.consumer = consumer;
+ *       }
+ *
+ *       public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  *           // read the offsets from an external store using some custom code not described here
  *           for(TopicPartition partition: partitions)
  *              consumer.seek(partition, readOffsetFromExternalStore(partition));
  *       }      
- *       public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+ *       public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  *           // save the offsets in an external store using some custom code not described here
  *           for(TopicPartition partition: partitions)
  *              saveOffsetInExternalStore(consumer.position(partition));
@@ -69,18 +75,17 @@ public interface ConsumerRebalanceListener {
 
     /**
      * A callback method the user can implement to provide handling of customized offsets on completion of a successful
-     * partition re-assignement. This method will be called after an offset re-assignement completes and before the
+     * partition re-assignment. This method will be called after an offset re-assignment completes and before the
      * consumer starts fetching data.
      * <p>
      * It is guaranteed that all the processes in a consumer group will execute their
-     * {@link #onPartitionsRevoked(Consumer, Collection)} callback before any instance executes its
-     * {@link #onPartitionsAssigned(Consumer, Collection)} callback.
+     * {@link #onPartitionsRevoked(Collection)} callback before any instance executes its
+     * {@link #onPartitionsAssigned(Collection)} callback.
      *
-     * @param consumer Reference to the consumer for convenience
      * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
      *            assigned to the consumer)
      */
-    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions);
 
     /**
      * A callback method the user can implement to provide handling of offset commits to a customized store on the start
@@ -90,8 +95,7 @@ public interface ConsumerRebalanceListener {
      * <p>
      * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
      *
-     * @param consumer  Reference to the consumer for convenience
      * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
      */
-    public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3b461b3..8831b0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -271,14 +271,13 @@ import java.util.regex.Pattern;
  * </ol>
  *
  * This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
- * search index use case described above). If the partition assignment is done automatically special care will also be
- * needed to handle the case where partition assignments change. This can be handled using a special callback specified
- * using <code>rebalance.callback.class</code>, which specifies an implementation of the interface
- * {@link ConsumerRebalanceListener}. When partitions are taken from a consumer the consumer will want to commit its
- * offset for those partitions by implementing
- * {@link ConsumerRebalanceListener#onPartitionsRevoked(Consumer, Collection)}. When partitions are assigned to a
+ * search index use case described above). If the partition assignment is done automatically special care is
+ * needed to handle the case where partition assignments change. This can be done by providing a
+ * {@link ConsumerRebalanceListener} instance in the call to {@link #subscribe(List, ConsumerRebalanceListener)}.
+ * When partitions are taken from a consumer the consumer will want to commit its offset for those partitions by
+ * implementing {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}. When partitions are assigned to a
  * consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer
- * to that position by implementing {@link ConsumerRebalanceListener#onPartitionsAssigned(Consumer, Collection)}.
+ * to that position by implementing {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)}.
  * <p>
  * Another common use for {@link ConsumerRebalanceListener} is to flush any caches the application maintains for
  * partitions that are moved elsewhere.
@@ -651,7 +650,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         acquire();
         try {
             log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
-            this.subscriptions.subscribe(topics, SubscriptionState.wrapListener(this, listener));
+            this.subscriptions.subscribe(topics, listener);
             metadata.setTopics(topics);
         } finally {
             release();
@@ -712,7 +711,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     metadata.setTopics(topicsToSubscribe);
                 }
             };
-            this.subscriptions.subscribe(pattern, SubscriptionState.wrapListener(this, listener));
+            this.subscriptions.subscribe(pattern, listener);
             this.metadata.needMetadataForAllTopics(true);
             this.metadata.addListener(metadataListener);
         } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 78c9c15..0ba5797 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -67,7 +67,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     @Override
     public void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) {
         ensureNotClosed();
-        this.subscriptions.subscribe(pattern, SubscriptionState.wrapListener(this, listener));
+        this.subscriptions.subscribe(pattern, listener);
         List<String> topicsToSubscribe = new ArrayList<>();
         for (String topic: partitions.keySet()) {
             if (pattern.matcher(topic).matches() &&
@@ -81,7 +81,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     @Override
     public synchronized void subscribe(List<String> topics, final ConsumerRebalanceListener listener) {
         ensureNotClosed();
-        this.subscriptions.subscribe(topics, SubscriptionState.wrapListener(this, listener));
+        this.subscriptions.subscribe(topics, listener);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index e7ffe25..374eceb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -13,6 +13,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
@@ -155,7 +156,7 @@ public final class Coordinator {
         if (!subscriptions.partitionAssignmentNeeded())
             return;
 
-        SubscriptionState.RebalanceListener listener = subscriptions.listener();
+        ConsumerRebalanceListener listener = subscriptions.listener();
 
         // execute the user's listener before rebalance
         log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
@@ -163,7 +164,7 @@ public final class Coordinator {
             Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
             listener.onPartitionsRevoked(revoked);
         } catch (Exception e) {
-            log.error("User provided listener " + listener.underlying().getClass().getName()
+            log.error("User provided listener " + listener.getClass().getName()
                     + " failed on partition revocation: ", e);
         }
 
@@ -175,7 +176,7 @@ public final class Coordinator {
             Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
             listener.onPartitionsAssigned(assigned);
         } catch (Exception e) {
-            log.error("User provided listener " + listener.underlying().getClass().getName()
+            log.error("User provided listener " + listener.getClass().getName()
                     + " failed on partition assignment: ", e);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
index d88d71c..3cb152d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
@@ -13,18 +13,17 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
-import java.util.Collection;
-
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collection;
+
 public class NoOpConsumerRebalanceListener implements ConsumerRebalanceListener {
 
     @Override
-    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
 
     @Override
-    public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index c92a581..f976df4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -12,12 +12,10 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -70,7 +68,8 @@ public class SubscriptionState {
     private final OffsetResetStrategy defaultResetStrategy;
 
     /* Listener to be invoked when assignment changes */
-    private RebalanceListener listener;
+    private ConsumerRebalanceListener listener;
+
     private static final String SUBSCRIPTION_EXCEPTION_MESSAGE =
         "Subscription to topics, partitions and pattern are mutually exclusive";
 
@@ -84,7 +83,7 @@ public class SubscriptionState {
         this.subscribedPattern = null;
     }
 
-    public void subscribe(List<String> topics, RebalanceListener listener) {
+    public void subscribe(List<String> topics, ConsumerRebalanceListener listener) {
         if (listener == null)
             throw new IllegalArgumentException("RebalanceListener cannot be null");
 
@@ -130,7 +129,7 @@ public class SubscriptionState {
         this.assignment.keySet().retainAll(this.userAssignment);
     }
 
-    public void subscribe(Pattern pattern, RebalanceListener listener) {
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
         if (listener == null)
             throw new IllegalArgumentException("RebalanceListener cannot be null");
 
@@ -305,7 +304,7 @@ public class SubscriptionState {
         this.assignment.put(tp, new TopicPartitionState());
     }
 
-    public RebalanceListener listener() {
+    public ConsumerRebalanceListener listener() {
         return listener;
     }
 
@@ -375,46 +374,4 @@ public class SubscriptionState {
 
     }
 
-    public static RebalanceListener wrapListener(final Consumer<?, ?> consumer,
-                                                 final ConsumerRebalanceListener listener) {
-        if (listener == null)
-            throw new IllegalArgumentException("ConsumerRebalanceLister must not be null");
-
-        return new RebalanceListener() {
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                listener.onPartitionsAssigned(consumer, partitions);
-            }
-
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                listener.onPartitionsRevoked(consumer, partitions);
-            }
-
-            @Override
-            public ConsumerRebalanceListener underlying() {
-                return listener;
-            }
-        };
-    }
-
-    /**
-     * Wrapper around {@link ConsumerRebalanceListener} to get around the need to provide a reference
-     * to the consumer in this class.
-     */
-    public static class RebalanceListener {
-        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-
-        }
-
-        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-
-        }
-
-        public ConsumerRebalanceListener underlying() {
-            return null;
-        }
-    }
-
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 068322f..24e3d81 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -16,11 +16,11 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Iterator;
 
 import static org.junit.Assert.assertEquals;
@@ -32,21 +32,7 @@ public class MockConsumerTest {
 
     @Test
     public void testSimpleMock() {
-        consumer.subscribe(Arrays.asList("test"), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
-
-            }
-
-            @Override
-            public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
-
-            }
-        });
-
-
-
-
+        consumer.subscribe(Arrays.asList("test"), new NoOpConsumerRebalanceListener());
         assertEquals(0, consumer.poll(1000).count());
         ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, "key1", "value1");
         ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, "key2", "value2");

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index 9eb2a27..490578e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
@@ -72,7 +73,7 @@ public class CoordinatorTest {
     private Metrics metrics;
     private Map<String, String> metricTags = new LinkedHashMap<String, String>();
     private ConsumerNetworkClient consumerClient;
-    private MockSubscriptionListener subscriptionListener;
+    private MockRebalanceListener subscriptionListener;
     private MockCommitCallback defaultOffsetCommitCallback;
     private Coordinator coordinator;
 
@@ -84,7 +85,7 @@ public class CoordinatorTest {
         this.metadata = new Metadata(0, Long.MAX_VALUE);
         this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
         this.metrics = new Metrics(time);
-        this.subscriptionListener = new MockSubscriptionListener();
+        this.subscriptionListener = new MockRebalanceListener();
         this.defaultOffsetCommitCallback = new MockCommitCallback();
 
         client.setNode(node);
@@ -533,7 +534,7 @@ public class CoordinatorTest {
         }
     }
 
-    private static class MockSubscriptionListener extends SubscriptionState.RebalanceListener {
+    private static class MockRebalanceListener implements ConsumerRebalanceListener {
         public Collection<TopicPartition> revoked;
         public Collection<TopicPartition> assigned;
         public int revokedCount = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index d79a10e..a4f6c19 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
@@ -56,7 +57,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class FetcherTest {
-    private SubscriptionState.RebalanceListener listener = new SubscriptionState.RebalanceListener();
+    private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
     private String topicName = "test";
     private String groupId = "test-group";
     private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics";

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index eb2c570..a279faa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -26,18 +26,17 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.regex.Pattern;
 
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 
 public class SubscriptionStateTest {
 
-    private SubscriptionState.RebalanceListener listener = new SubscriptionState.RebalanceListener();
     private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST);
     private final TopicPartition tp0 = new TopicPartition("test", 0);
     private final TopicPartition tp1 = new TopicPartition("test", 1);
-    private final MockSubscriptionListener mockSubscriptionListener = new
-        MockSubscriptionListener();
+    private final MockRebalanceListener rebalanceListener = new MockRebalanceListener();
 
     @Test
     public void partitionAssignment() {
@@ -73,7 +72,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void topicSubscription() {
-        state.subscribe(Arrays.asList("test"), listener);
+        state.subscribe(Arrays.asList("test"), rebalanceListener);
         assertEquals(1, state.subscription().size());
         assertTrue(state.assignedPartitions().isEmpty());
         assertTrue(state.partitionsAutoAssigned());
@@ -102,7 +101,7 @@ public class SubscriptionStateTest {
     @Test
     public void topicUnsubscription() {
         final String topic = "test";
-        state.subscribe(Arrays.asList(topic), listener);
+        state.subscribe(Arrays.asList(topic), rebalanceListener);
         assertEquals(1, state.subscription().size());
         assertTrue(state.assignedPartitions().isEmpty());
         assertTrue(state.partitionsAutoAssigned());
@@ -114,21 +113,21 @@ public class SubscriptionStateTest {
         assertFalse(state.isAssigned(tp0));
         assertEquals(Collections.singleton(tp1), state.assignedPartitions());
 
-        state.subscribe(Arrays.<String>asList(), listener);
+        state.subscribe(Arrays.<String>asList(), rebalanceListener);
         assertEquals(0, state.subscription().size());
         assertTrue(state.assignedPartitions().isEmpty());
     }
 
     @Test(expected = IllegalStateException.class)
     public void invalidConsumedPositionUpdate() {
-        state.subscribe(Arrays.asList("test"), listener);
+        state.subscribe(Arrays.asList("test"), rebalanceListener);
         state.changePartitionAssignment(asList(tp0));
         state.consumed(tp0, 0);
     }
 
     @Test(expected = IllegalStateException.class)
     public void invalidFetchPositionUpdate() {
-        state.subscribe(Arrays.asList("test"), listener);
+        state.subscribe(Arrays.asList("test"), rebalanceListener);
         state.changePartitionAssignment(asList(tp0));
         state.fetched(tp0, 0);
     }
@@ -151,49 +150,49 @@ public class SubscriptionStateTest {
 
     @Test(expected = IllegalStateException.class)
     public void cantSubscribeTopicAndPattern() {
-        state.subscribe(Arrays.asList("test"), mockSubscriptionListener);
-        state.subscribe(Pattern.compile(".*"), listener);
+        state.subscribe(Arrays.asList("test"), rebalanceListener);
+        state.subscribe(Pattern.compile(".*"), rebalanceListener);
     }
 
     @Test(expected = IllegalStateException.class)
     public void cantSubscribePartitionAndPattern() {
         state.assign(Arrays.asList(new TopicPartition("test", 0)));
-        state.subscribe(Pattern.compile(".*"), listener);
+        state.subscribe(Pattern.compile(".*"), rebalanceListener);
     }
 
     @Test(expected = IllegalStateException.class)
     public void cantSubscribePatternAndTopic() {
-        state.subscribe(Pattern.compile(".*"), listener);
-        state.subscribe(Arrays.asList("test"), mockSubscriptionListener);
+        state.subscribe(Pattern.compile(".*"), rebalanceListener);
+        state.subscribe(Arrays.asList("test"), rebalanceListener);
     }
 
     @Test(expected = IllegalStateException.class)
     public void cantSubscribePatternAndPartition() {
-        state.subscribe(Pattern.compile(".*"), listener);
+        state.subscribe(Pattern.compile(".*"), rebalanceListener);
         state.assign(Arrays.asList(new TopicPartition("test", 0)));
     }
 
     @Test
     public void patternSubscription() {
-        state.subscribe(Pattern.compile(".*"), listener);
+        state.subscribe(Pattern.compile(".*"), rebalanceListener);
         state.changeSubscription(Arrays.asList("test", "test1"));
 
         assertEquals(
-            "Expected subscribed topics count is incorrect", 2, state.subscription().size());
+                "Expected subscribed topics count is incorrect", 2, state.subscription().size());
     }
 
     @Test
     public void patternUnsubscription() {
-        state.subscribe(Pattern.compile(".*"), listener);
+        state.subscribe(Pattern.compile(".*"), rebalanceListener);
         state.changeSubscription(Arrays.asList("test", "test1"));
 
         state.unsubscribe();
 
         assertEquals(
-            "Expected subscribed topics count is incorrect", 0, state.subscription().size());
+                "Expected subscribed topics count is incorrect", 0, state.subscription().size());
     }
 
-    private static class MockSubscriptionListener extends SubscriptionState.RebalanceListener {
+    private static class MockRebalanceListener implements ConsumerRebalanceListener {
         public Collection<TopicPartition> revoked;
         public Collection<TopicPartition> assigned;
         public int revokedCount = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 1be4b23..8267030 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -107,10 +107,10 @@ object ConsumerPerformance {
     val joinTimeout = 10000
     val isAssigned = new AtomicBoolean(false)
     consumer.subscribe(topics, new ConsumerRebalanceListener {
-      def onPartitionsAssigned(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) {
+      def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
         isAssigned.set(true)
       }
-      def onPartitionsRevoked(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) {
+      def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) {
         isAssigned.set(false)
       }})
     val joinStart = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 962d2b8..1b1973f 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -428,11 +428,11 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   private class TestConsumerReassignmentListener extends ConsumerRebalanceListener {
     var callsToAssigned = 0
     var callsToRevoked = 0
-    def onPartitionsAssigned(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
+    def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]) {
       info("onPartitionsAssigned called.")
       callsToAssigned += 1
     }
-    def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
+    def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]) {
       info("onPartitionsRevoked called.")
       callsToRevoked += 1
     }