You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/09/21 22:10:49 UTC
kafka git commit: KAFKA-2532;
Remove Consumer reference from rebalance callback
Repository: kafka
Updated Branches:
refs/heads/trunk e18f6860c -> 6ec88f7f8
KAFKA-2532; Remove Consumer reference from rebalance callback
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ewen Cheslack-Postava, Onur Karaman, Guozhang Wang
Closes #203 from hachikuji/KAFKA-2532
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6ec88f7f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6ec88f7f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6ec88f7f
Branch: refs/heads/trunk
Commit: 6ec88f7f8a18be7aff12fb0c73fc8abd00ef62d2
Parents: e18f686
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Sep 21 13:14:17 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Sep 21 13:14:17 2015 -0700
----------------------------------------------------------------------
.../consumer/ConsumerRebalanceListener.java | 32 ++++++------
.../kafka/clients/consumer/KafkaConsumer.java | 17 +++----
.../kafka/clients/consumer/MockConsumer.java | 4 +-
.../clients/consumer/internals/Coordinator.java | 7 +--
.../NoOpConsumerRebalanceListener.java | 9 ++--
.../consumer/internals/SubscriptionState.java | 53 ++------------------
.../clients/consumer/MockConsumerTest.java | 18 +------
.../consumer/internals/CoordinatorTest.java | 7 +--
.../clients/consumer/internals/FetcherTest.java | 3 +-
.../internals/SubscriptionStateTest.java | 37 +++++++-------
.../scala/kafka/tools/ConsumerPerformance.scala | 4 +-
.../integration/kafka/api/ConsumerTest.scala | 4 +-
12 files changed, 71 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
index 68939cb..671b6f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.TopicPartition;
* administratively adjusted).
* <p>
* There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in
- * the {@link #onPartitionsRevoked(Consumer, Collection)} call we can ensure that any time partition assignment changes
+ * the {@link #onPartitionsRevoked(Collection)}, call we can ensure that any time partition assignment changes
* the offset gets saved.
* <p>
* Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example,
@@ -42,21 +42,27 @@ import org.apache.kafka.common.TopicPartition;
* <p>
* This callback will execute in the user thread as part of the {@link Consumer#poll(long) poll(long)} call whenever partition assignment changes.
* <p>
- * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} prior to
- * any process invoking {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned}. So if offsets or other state is saved in the
- * {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
- * partition has their {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned} callback called to load the state.
+ * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} prior to
+ * any process invoking {@link #onPartitionsAssigned(Collection) onPartitionsAssigned}. So if offsets or other state is saved in the
+ * {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
+ * partition has their {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} callback called to load the state.
* <p>
* Here is pseudo-code for a callback implementation for saving offsets:
* <pre>
* {@code
* public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
- * public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+ * private Consumer<?,?> consumer;
+ *
+ * public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
+ * this.consumer = consumer;
+ * }
+ *
+ * public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
* // read the offsets from an external store using some custom code not described here
* for(TopicPartition partition: partitions)
* consumer.seek(partition, readOffsetFromExternalStore(partition));
* }
- * public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+ * public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
* // save the offsets in an external store using some custom code not described here
* for(TopicPartition partition: partitions)
* saveOffsetInExternalStore(consumer.position(partition));
@@ -69,18 +75,17 @@ public interface ConsumerRebalanceListener {
/**
* A callback method the user can implement to provide handling of customized offsets on completion of a successful
- * partition re-assignement. This method will be called after an offset re-assignement completes and before the
+ * partition re-assignment. This method will be called after an offset re-assignment completes and before the
* consumer starts fetching data.
* <p>
* It is guaranteed that all the processes in a consumer group will execute their
- * {@link #onPartitionsRevoked(Consumer, Collection)} callback before any instance executes its
- * {@link #onPartitionsAssigned(Consumer, Collection)} callback.
+ * {@link #onPartitionsRevoked(Collection)} callback before any instance executes its
+ * {@link #onPartitionsAssigned(Collection)} callback.
*
- * @param consumer Reference to the consumer for convenience
* @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
* assigned to the consumer)
*/
- public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions);
/**
* A callback method the user can implement to provide handling of offset commits to a customized store on the start
@@ -90,8 +95,7 @@ public interface ConsumerRebalanceListener {
* <p>
* For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
*
- * @param consumer Reference to the consumer for convenience
* @param partitions The list of partitions that were assigned to the consumer on the last rebalance
*/
- public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3b461b3..8831b0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -271,14 +271,13 @@ import java.util.regex.Pattern;
* </ol>
*
* This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
- * search index use case described above). If the partition assignment is done automatically special care will also be
- * needed to handle the case where partition assignments change. This can be handled using a special callback specified
- * using <code>rebalance.callback.class</code>, which specifies an implementation of the interface
- * {@link ConsumerRebalanceListener}. When partitions are taken from a consumer the consumer will want to commit its
- * offset for those partitions by implementing
- * {@link ConsumerRebalanceListener#onPartitionsRevoked(Consumer, Collection)}. When partitions are assigned to a
+ * search index use case described above). If the partition assignment is done automatically special care is
+ * needed to handle the case where partition assignments change. This can be done by providing a
+ * {@link ConsumerRebalanceListener} instance in the call to {@link #subscribe(List, ConsumerRebalanceListener)}.
+ * When partitions are taken from a consumer the consumer will want to commit its offset for those partitions by
+ * implementing {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}. When partitions are assigned to a
* consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer
- * to that position by implementing {@link ConsumerRebalanceListener#onPartitionsAssigned(Consumer, Collection)}.
+ * to that position by implementing {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)}.
* <p>
* Another common use for {@link ConsumerRebalanceListener} is to flush any caches the application maintains for
* partitions that are moved elsewhere.
@@ -651,7 +650,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
acquire();
try {
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
- this.subscriptions.subscribe(topics, SubscriptionState.wrapListener(this, listener));
+ this.subscriptions.subscribe(topics, listener);
metadata.setTopics(topics);
} finally {
release();
@@ -712,7 +711,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
metadata.setTopics(topicsToSubscribe);
}
};
- this.subscriptions.subscribe(pattern, SubscriptionState.wrapListener(this, listener));
+ this.subscriptions.subscribe(pattern, listener);
this.metadata.needMetadataForAllTopics(true);
this.metadata.addListener(metadataListener);
} finally {
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 78c9c15..0ba5797 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -67,7 +67,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
@Override
public void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) {
ensureNotClosed();
- this.subscriptions.subscribe(pattern, SubscriptionState.wrapListener(this, listener));
+ this.subscriptions.subscribe(pattern, listener);
List<String> topicsToSubscribe = new ArrayList<>();
for (String topic: partitions.keySet()) {
if (pattern.matcher(topic).matches() &&
@@ -81,7 +81,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
@Override
public synchronized void subscribe(List<String> topics, final ConsumerRebalanceListener listener) {
ensureNotClosed();
- this.subscriptions.subscribe(topics, SubscriptionState.wrapListener(this, listener));
+ this.subscriptions.subscribe(topics, listener);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index e7ffe25..374eceb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -13,6 +13,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
@@ -155,7 +156,7 @@ public final class Coordinator {
if (!subscriptions.partitionAssignmentNeeded())
return;
- SubscriptionState.RebalanceListener listener = subscriptions.listener();
+ ConsumerRebalanceListener listener = subscriptions.listener();
// execute the user's listener before rebalance
log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
@@ -163,7 +164,7 @@ public final class Coordinator {
Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsRevoked(revoked);
} catch (Exception e) {
- log.error("User provided listener " + listener.underlying().getClass().getName()
+ log.error("User provided listener " + listener.getClass().getName()
+ " failed on partition revocation: ", e);
}
@@ -175,7 +176,7 @@ public final class Coordinator {
Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsAssigned(assigned);
} catch (Exception e) {
- log.error("User provided listener " + listener.underlying().getClass().getName()
+ log.error("User provided listener " + listener.getClass().getName()
+ " failed on partition assignment: ", e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
index d88d71c..3cb152d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
@@ -13,18 +13,17 @@
package org.apache.kafka.clients.consumer.internals;
-import java.util.Collection;
-
-import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
+import java.util.Collection;
+
public class NoOpConsumerRebalanceListener implements ConsumerRebalanceListener {
@Override
- public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
@Override
- public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index c92a581..f976df4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -12,12 +12,10 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -70,7 +68,8 @@ public class SubscriptionState {
private final OffsetResetStrategy defaultResetStrategy;
/* Listener to be invoked when assignment changes */
- private RebalanceListener listener;
+ private ConsumerRebalanceListener listener;
+
private static final String SUBSCRIPTION_EXCEPTION_MESSAGE =
"Subscription to topics, partitions and pattern are mutually exclusive";
@@ -84,7 +83,7 @@ public class SubscriptionState {
this.subscribedPattern = null;
}
- public void subscribe(List<String> topics, RebalanceListener listener) {
+ public void subscribe(List<String> topics, ConsumerRebalanceListener listener) {
if (listener == null)
throw new IllegalArgumentException("RebalanceListener cannot be null");
@@ -130,7 +129,7 @@ public class SubscriptionState {
this.assignment.keySet().retainAll(this.userAssignment);
}
- public void subscribe(Pattern pattern, RebalanceListener listener) {
+ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
if (listener == null)
throw new IllegalArgumentException("RebalanceListener cannot be null");
@@ -305,7 +304,7 @@ public class SubscriptionState {
this.assignment.put(tp, new TopicPartitionState());
}
- public RebalanceListener listener() {
+ public ConsumerRebalanceListener listener() {
return listener;
}
@@ -375,46 +374,4 @@ public class SubscriptionState {
}
- public static RebalanceListener wrapListener(final Consumer<?, ?> consumer,
- final ConsumerRebalanceListener listener) {
- if (listener == null)
- throw new IllegalArgumentException("ConsumerRebalanceLister must not be null");
-
- return new RebalanceListener() {
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- listener.onPartitionsAssigned(consumer, partitions);
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
- listener.onPartitionsRevoked(consumer, partitions);
- }
-
- @Override
- public ConsumerRebalanceListener underlying() {
- return listener;
- }
- };
- }
-
- /**
- * Wrapper around {@link ConsumerRebalanceListener} to get around the need to provide a reference
- * to the consumer in this class.
- */
- public static class RebalanceListener {
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-
- }
-
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-
- }
-
- public ConsumerRebalanceListener underlying() {
- return null;
- }
- }
-
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 068322f..24e3d81 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -16,11 +16,11 @@
*/
package org.apache.kafka.clients.consumer;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Iterator;
import static org.junit.Assert.assertEquals;
@@ -32,21 +32,7 @@ public class MockConsumerTest {
@Test
public void testSimpleMock() {
- consumer.subscribe(Arrays.asList("test"), new ConsumerRebalanceListener() {
- @Override
- public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
-
- }
-
- @Override
- public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
-
- }
- });
-
-
-
-
+ consumer.subscribe(Arrays.asList("test"), new NoOpConsumerRebalanceListener());
assertEquals(0, consumer.poll(1000).count());
ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, "key1", "value1");
ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, "key2", "value2");
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index 9eb2a27..490578e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
@@ -72,7 +73,7 @@ public class CoordinatorTest {
private Metrics metrics;
private Map<String, String> metricTags = new LinkedHashMap<String, String>();
private ConsumerNetworkClient consumerClient;
- private MockSubscriptionListener subscriptionListener;
+ private MockRebalanceListener subscriptionListener;
private MockCommitCallback defaultOffsetCommitCallback;
private Coordinator coordinator;
@@ -84,7 +85,7 @@ public class CoordinatorTest {
this.metadata = new Metadata(0, Long.MAX_VALUE);
this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
this.metrics = new Metrics(time);
- this.subscriptionListener = new MockSubscriptionListener();
+ this.subscriptionListener = new MockRebalanceListener();
this.defaultOffsetCommitCallback = new MockCommitCallback();
client.setNode(node);
@@ -533,7 +534,7 @@ public class CoordinatorTest {
}
}
- private static class MockSubscriptionListener extends SubscriptionState.RebalanceListener {
+ private static class MockRebalanceListener implements ConsumerRebalanceListener {
public Collection<TopicPartition> revoked;
public Collection<TopicPartition> assigned;
public int revokedCount = 0;
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index d79a10e..a4f6c19 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
@@ -56,7 +57,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class FetcherTest {
- private SubscriptionState.RebalanceListener listener = new SubscriptionState.RebalanceListener();
+ private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
private String topicName = "test";
private String groupId = "test-group";
private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics";
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index eb2c570..a279faa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -26,18 +26,17 @@ import java.util.Collection;
import java.util.Collections;
import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
public class SubscriptionStateTest {
- private SubscriptionState.RebalanceListener listener = new SubscriptionState.RebalanceListener();
private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private final TopicPartition tp0 = new TopicPartition("test", 0);
private final TopicPartition tp1 = new TopicPartition("test", 1);
- private final MockSubscriptionListener mockSubscriptionListener = new
- MockSubscriptionListener();
+ private final MockRebalanceListener rebalanceListener = new MockRebalanceListener();
@Test
public void partitionAssignment() {
@@ -73,7 +72,7 @@ public class SubscriptionStateTest {
@Test
public void topicSubscription() {
- state.subscribe(Arrays.asList("test"), listener);
+ state.subscribe(Arrays.asList("test"), rebalanceListener);
assertEquals(1, state.subscription().size());
assertTrue(state.assignedPartitions().isEmpty());
assertTrue(state.partitionsAutoAssigned());
@@ -102,7 +101,7 @@ public class SubscriptionStateTest {
@Test
public void topicUnsubscription() {
final String topic = "test";
- state.subscribe(Arrays.asList(topic), listener);
+ state.subscribe(Arrays.asList(topic), rebalanceListener);
assertEquals(1, state.subscription().size());
assertTrue(state.assignedPartitions().isEmpty());
assertTrue(state.partitionsAutoAssigned());
@@ -114,21 +113,21 @@ public class SubscriptionStateTest {
assertFalse(state.isAssigned(tp0));
assertEquals(Collections.singleton(tp1), state.assignedPartitions());
- state.subscribe(Arrays.<String>asList(), listener);
+ state.subscribe(Arrays.<String>asList(), rebalanceListener);
assertEquals(0, state.subscription().size());
assertTrue(state.assignedPartitions().isEmpty());
}
@Test(expected = IllegalStateException.class)
public void invalidConsumedPositionUpdate() {
- state.subscribe(Arrays.asList("test"), listener);
+ state.subscribe(Arrays.asList("test"), rebalanceListener);
state.changePartitionAssignment(asList(tp0));
state.consumed(tp0, 0);
}
@Test(expected = IllegalStateException.class)
public void invalidFetchPositionUpdate() {
- state.subscribe(Arrays.asList("test"), listener);
+ state.subscribe(Arrays.asList("test"), rebalanceListener);
state.changePartitionAssignment(asList(tp0));
state.fetched(tp0, 0);
}
@@ -151,49 +150,49 @@ public class SubscriptionStateTest {
@Test(expected = IllegalStateException.class)
public void cantSubscribeTopicAndPattern() {
- state.subscribe(Arrays.asList("test"), mockSubscriptionListener);
- state.subscribe(Pattern.compile(".*"), listener);
+ state.subscribe(Arrays.asList("test"), rebalanceListener);
+ state.subscribe(Pattern.compile(".*"), rebalanceListener);
}
@Test(expected = IllegalStateException.class)
public void cantSubscribePartitionAndPattern() {
state.assign(Arrays.asList(new TopicPartition("test", 0)));
- state.subscribe(Pattern.compile(".*"), listener);
+ state.subscribe(Pattern.compile(".*"), rebalanceListener);
}
@Test(expected = IllegalStateException.class)
public void cantSubscribePatternAndTopic() {
- state.subscribe(Pattern.compile(".*"), listener);
- state.subscribe(Arrays.asList("test"), mockSubscriptionListener);
+ state.subscribe(Pattern.compile(".*"), rebalanceListener);
+ state.subscribe(Arrays.asList("test"), rebalanceListener);
}
@Test(expected = IllegalStateException.class)
public void cantSubscribePatternAndPartition() {
- state.subscribe(Pattern.compile(".*"), listener);
+ state.subscribe(Pattern.compile(".*"), rebalanceListener);
state.assign(Arrays.asList(new TopicPartition("test", 0)));
}
@Test
public void patternSubscription() {
- state.subscribe(Pattern.compile(".*"), listener);
+ state.subscribe(Pattern.compile(".*"), rebalanceListener);
state.changeSubscription(Arrays.asList("test", "test1"));
assertEquals(
- "Expected subscribed topics count is incorrect", 2, state.subscription().size());
+ "Expected subscribed topics count is incorrect", 2, state.subscription().size());
}
@Test
public void patternUnsubscription() {
- state.subscribe(Pattern.compile(".*"), listener);
+ state.subscribe(Pattern.compile(".*"), rebalanceListener);
state.changeSubscription(Arrays.asList("test", "test1"));
state.unsubscribe();
assertEquals(
- "Expected subscribed topics count is incorrect", 0, state.subscription().size());
+ "Expected subscribed topics count is incorrect", 0, state.subscription().size());
}
- private static class MockSubscriptionListener extends SubscriptionState.RebalanceListener {
+ private static class MockRebalanceListener implements ConsumerRebalanceListener {
public Collection<TopicPartition> revoked;
public Collection<TopicPartition> assigned;
public int revokedCount = 0;
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 1be4b23..8267030 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -107,10 +107,10 @@ object ConsumerPerformance {
val joinTimeout = 10000
val isAssigned = new AtomicBoolean(false)
consumer.subscribe(topics, new ConsumerRebalanceListener {
- def onPartitionsAssigned(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) {
+ def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
isAssigned.set(true)
}
- def onPartitionsRevoked(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) {
+ def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) {
isAssigned.set(false)
}})
val joinStart = System.currentTimeMillis()
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec88f7f/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 962d2b8..1b1973f 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -428,11 +428,11 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
private class TestConsumerReassignmentListener extends ConsumerRebalanceListener {
var callsToAssigned = 0
var callsToRevoked = 0
- def onPartitionsAssigned(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
+ def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]) {
info("onPartitionsAssigned called.")
callsToAssigned += 1
}
- def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
+ def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]) {
info("onPartitionsRevoked called.")
callsToRevoked += 1
}