You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/08/27 02:18:48 UTC
[2/2] kafka git commit: KAFKA-2388: refactor KafkaConsumer subscribe
API
KAFKA-2388: refactor KafkaConsumer subscribe API
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Edward Ribeiro, Onur Karaman, Ismael Juma, Guozhang Wang
Closes #139 from hachikuji/KAFKA-2388 and squashes the following commits:
377c67e [Jason Gustafson] KAFKA-2388; refactor KafkaConsumer subscribe API
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/35eaef7b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/35eaef7b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/35eaef7b
Branch: refs/heads/trunk
Commit: 35eaef7bb4ebcf6b209312db774564451b052ca9
Parents: 03f850f
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Aug 26 17:20:51 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Aug 26 17:20:51 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/clients/Metadata.java | 19 +-
.../apache/kafka/clients/consumer/Consumer.java | 20 +--
.../kafka/clients/consumer/ConsumerConfig.java | 12 --
.../consumer/ConsumerRebalanceCallback.java | 95 ----------
.../consumer/ConsumerRebalanceListener.java | 97 ++++++++++
.../kafka/clients/consumer/KafkaConsumer.java | 175 +++++++++----------
.../kafka/clients/consumer/MockConsumer.java | 27 ++-
.../clients/consumer/internals/Coordinator.java | 30 ++--
.../NoOpConsumerRebalanceCallback.java | 30 ----
.../NoOpConsumerRebalanceListener.java | 30 ++++
.../consumer/internals/SubscriptionState.java | 158 +++++++++++------
.../clients/consumer/KafkaConsumerTest.java | 2 +-
.../clients/consumer/MockConsumerTest.java | 25 ++-
.../consumer/internals/CoordinatorTest.java | 47 ++---
.../clients/consumer/internals/FetcherTest.java | 29 +--
.../internals/SubscriptionStateTest.java | 30 ++--
.../kafka/copycat/runtime/WorkerSinkTask.java | 6 +-
.../copycat/runtime/WorkerSinkTaskTest.java | 2 +-
.../scala/kafka/consumer/BaseConsumer.scala | 3 +-
.../scala/kafka/tools/ConsumerPerformance.scala | 2 +-
.../kafka/api/ConsumerBounceTest.scala | 4 +-
.../integration/kafka/api/ConsumerTest.scala | 83 ++++-----
.../integration/kafka/api/QuotasTest.scala | 2 +-
.../integration/kafka/api/SSLConsumerTest.scala | 31 +---
.../kafka/consumer/PartitionAssignorTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 10 +-
26 files changed, 498 insertions(+), 473 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 0387f26..3f3a5f5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -12,14 +12,15 @@
*/
package org.apache.kafka.clients;
-import java.util.HashSet;
-import java.util.Set;
-
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
/**
* A class encapsulating some of the logic around metadata.
* <p>
@@ -118,12 +119,14 @@ public final class Metadata {
}
/**
- * Add one or more topics to maintain metadata for
+ * Replace the current set of topics maintained to the one provided
+ * @param topics
*/
- public synchronized void addTopics(String... topics) {
- for (String topic : topics)
- this.topics.add(topic);
- requestUpdate();
+ public synchronized void setTopics(Collection<String> topics) {
+ if (!this.topics.containsAll(topics))
+ requestUpdate();
+ this.topics.clear();
+ this.topics.addAll(topics);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 76834ad..509918c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -31,29 +31,29 @@ import org.apache.kafka.common.annotation.InterfaceStability;
public interface Consumer<K, V> extends Closeable {
/**
- * @see KafkaConsumer#subscriptions()
+ * @see KafkaConsumer#assignment()
*/
- public Set<TopicPartition> subscriptions();
+ public Set<TopicPartition> assignment();
/**
- * @see KafkaConsumer#subscribe(String...)
+ * @see KafkaConsumer#subscription()
*/
- public void subscribe(String... topics);
+ public Set<String> subscription();
/**
- * @see KafkaConsumer#subscribe(TopicPartition...)
+ * @see KafkaConsumer#subscribe(List)
*/
- public void subscribe(TopicPartition... partitions);
+ public void subscribe(List<String> topics);
/**
- * @see KafkaConsumer#unsubscribe(String...)
+ * @see KafkaConsumer#subscribe(List, ConsumerRebalanceListener)
*/
- public void unsubscribe(String... topics);
+ public void subscribe(List<String> topics, ConsumerRebalanceListener callback);
/**
- * @see KafkaConsumer#unsubscribe(TopicPartition...)
+ * @see KafkaConsumer#assign(List)
*/
- public void unsubscribe(TopicPartition... partitions);
+ public void assign(List<TopicPartition> partitions);
/**
* @see KafkaConsumer#poll(long)
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 9c9510a..b9a2d4e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -13,7 +13,6 @@
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceCallback;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -144,12 +143,6 @@ public class ConsumerConfig extends AbstractConfig {
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
/**
- * <code>rebalance.callback.class</code>
- */
- public static final String CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG = "rebalance.callback.class";
- private static final String CONSUMER_REBALANCE_CALLBACK_CLASS_DOC = "A user-provided callback to execute when partition assignments change.";
-
- /**
* <code>check.crcs</code>
*/
public static final String CHECK_CRCS_CONFIG = "check.crcs";
@@ -259,11 +252,6 @@ public class ConsumerConfig extends AbstractConfig {
in("latest", "earliest", "none"),
Importance.MEDIUM,
AUTO_OFFSET_RESET_DOC)
- .define(CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
- Type.CLASS,
- NoOpConsumerRebalanceCallback.class,
- Importance.LOW,
- CONSUMER_REBALANCE_CALLBACK_CLASS_DOC)
.define(CHECK_CRCS_CONFIG,
Type.BOOLEAN,
true,
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
deleted file mode 100644
index ff3f50f..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import java.util.Collection;
-
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the
- * consumer changes.
- * <p>
- * This is applicable when the consumer is having Kafka auto-manage group membership, if the consumer's directly subscribe to partitions
- * those partitions will never be reassigned and this callback is not applicable.
- * <p>
- * When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group changes or the subscription
- * of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure.
- * <p>
- * There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in
- * the {@link #onPartitionsRevoked(Consumer, Collection)} call we can ensure that any time partition assignment changes
- * the offset gets saved.
- * <p>
- * Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example,
- * consider a case where the consumer is subscribed to a topic containing user page views, and the goal is to count the
- * number of page views per users for each five minute window. Let's say the topic is partitioned by the user id so that
- * all events for a particular user will go to a single consumer instance. The consumer can keep in memory a running
- * tally of actions per user and only flush these out to a remote data store when it's cache gets to big. However if a
- * partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over
- * consumption.
- * <p>
- * This callback will execute in the user thread as part of the {@link Consumer#poll(long) poll(long)} call whenever partition assignment changes.
- * <p>
- * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} prior to
- * any process invoking {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned}. So if offsets or other state is saved in the
- * {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
- * partition has their {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned} callback called to load the state.
- * <p>
- * Here is pseudo-code for a callback implementation for saving offsets:
- * <pre>
- * {@code
- * public class SaveOffsetsOnRebalance implements ConsumerRebalanceCallback {
- * public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
- * // read the offsets from an external store using some custom code not described here
- * for(TopicPartition partition: partitions)
- * consumer.position(partition, readOffsetFromExternalStore(partition));
- * }
- * public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
- * // save the offsets in an external store using some custom code not described here
- * for(TopicPartition partition: partitions)
- * saveOffsetInExternalStore(consumer.position(partition));
- * }
- * }
- * }
- * </pre>
- */
-public interface ConsumerRebalanceCallback {
-
- /**
- * A callback method the user can implement to provide handling of customized offsets on completion of a successful
- * partition re-assignement. This method will be called after an offset re-assignement completes and before the
- * consumer starts fetching data.
- * <p>
- * It is guaranteed that all the processes in a consumer group will execute their
- * {@link #onPartitionsRevoked(Consumer, Collection)} callback before any instance executes its
- * {@link #onPartitionsAssigned(Consumer, Collection)} callback.
- *
- * @param consumer Reference to the consumer for convenience
- * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
- * assigned to the consumer)
- */
- public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
-
- /**
- * A callback method the user can implement to provide handling of offset commits to a customized store on the start
- * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer
- * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a
- * custom offset store to prevent duplicate data
- * <p>
- * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
- *
- * @param consumer Reference to the consumer for convenience
- * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
- */
- public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
new file mode 100644
index 0000000..2f2591c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Collection;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the
+ * consumer changes.
+ * <p>
+ * This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer's directly assign partitions,
+ * those partitions will never be reassigned and this callback is not applicable.
+ * <p>
+ * When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group changes or the subscription
+ * of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure.
+ * Rebalances can also be triggered by changes affecting the subscribed topics (e.g. when then number of partitions is
+ * administratively adjusted).
+ * <p>
+ * There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in
+ * the {@link #onPartitionsRevoked(Consumer, Collection)} call we can ensure that any time partition assignment changes
+ * the offset gets saved.
+ * <p>
+ * Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example,
+ * consider a case where the consumer is subscribed to a topic containing user page views, and the goal is to count the
+ * number of page views per users for each five minute window. Let's say the topic is partitioned by the user id so that
+ * all events for a particular user will go to a single consumer instance. The consumer can keep in memory a running
+ * tally of actions per user and only flush these out to a remote data store when it's cache gets to big. However if a
+ * partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over
+ * consumption.
+ * <p>
+ * This callback will execute in the user thread as part of the {@link Consumer#poll(long) poll(long)} call whenever partition assignment changes.
+ * <p>
+ * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} prior to
+ * any process invoking {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned}. So if offsets or other state is saved in the
+ * {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
+ * partition has their {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned} callback called to load the state.
+ * <p>
+ * Here is pseudo-code for a callback implementation for saving offsets:
+ * <pre>
+ * {@code
+ * public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
+ * public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+ * // read the offsets from an external store using some custom code not described here
+ * for(TopicPartition partition: partitions)
+ * consumer.seek(partition, readOffsetFromExternalStore(partition));
+ * }
+ * public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+ * // save the offsets in an external store using some custom code not described here
+ * for(TopicPartition partition: partitions)
+ * saveOffsetInExternalStore(consumer.position(partition));
+ * }
+ * }
+ * }
+ * </pre>
+ */
+public interface ConsumerRebalanceListener {
+
+ /**
+ * A callback method the user can implement to provide handling of customized offsets on completion of a successful
+ * partition re-assignement. This method will be called after an offset re-assignement completes and before the
+ * consumer starts fetching data.
+ * <p>
+ * It is guaranteed that all the processes in a consumer group will execute their
+ * {@link #onPartitionsRevoked(Consumer, Collection)} callback before any instance executes its
+ * {@link #onPartitionsAssigned(Consumer, Collection)} callback.
+ *
+ * @param consumer Reference to the consumer for convenience
+ * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
+ * assigned to the consumer)
+ */
+ public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
+
+ /**
+ * A callback method the user can implement to provide handling of offset commits to a customized store on the start
+ * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer
+ * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a
+ * custom offset store to prevent duplicate data
+ * <p>
+ * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
+ *
+ * @param consumer Reference to the consumer for convenience
+ * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
+ */
+ public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 8b54acd..938981c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Coordinator;
import org.apache.kafka.clients.consumer.internals.DelayedTask;
import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
@@ -46,6 +47,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -229,8 +231,8 @@ import static org.apache.kafka.common.utils.Utils.min;
* String topic = "foo";
* TopicPartition partition0 = new TopicPartition(topic, 0);
* TopicPartition partition1 = new TopicPartition(topic, 1);
- * consumer.subscribe(partition0);
- * consumer.subscribe(partition1);
+ * consumer.assign(partition0);
+ * consumer.assign(partition1);
* </pre>
*
* The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only
@@ -272,13 +274,13 @@ import static org.apache.kafka.common.utils.Utils.min;
* search index use case described above). If the partition assignment is done automatically special care will also be
* needed to handle the case where partition assignments change. This can be handled using a special callback specified
* using <code>rebalance.callback.class</code>, which specifies an implementation of the interface
- * {@link ConsumerRebalanceCallback}. When partitions are taken from a consumer the consumer will want to commit its
+ * {@link ConsumerRebalanceListener}. When partitions are taken from a consumer the consumer will want to commit its
* offset for those partitions by implementing
- * {@link ConsumerRebalanceCallback#onPartitionsRevoked(Consumer, Collection)}. When partitions are assigned to a
+ * {@link ConsumerRebalanceListener#onPartitionsRevoked(Consumer, Collection)}. When partitions are assigned to a
* consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer
- * to that position by implementing {@link ConsumerRebalanceCallback#onPartitionsAssigned(Consumer, Collection)}.
+ * to that position by implementing {@link ConsumerRebalanceListener#onPartitionsAssigned(Consumer, Collection)}.
* <p>
- * Another common use for {@link ConsumerRebalanceCallback} is to flush any caches the application maintains for
+ * Another common use for {@link ConsumerRebalanceListener} is to flush any caches the application maintains for
* partitions that are moved elsewhere.
*
* <h4>Controlling The Consumer's Position</h4>
@@ -437,29 +439,25 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param configs The consumer configs
*/
public KafkaConsumer(Map<String, Object> configs) {
- this(configs, null, null, null);
+ this(configs, null, null);
}
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration, a
- * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
+ * {@link ConsumerRebalanceListener} implementation, a key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
*
* @param configs The consumer configs
- * @param callback A callback interface that the user can implement to manage customized offsets on the start and
- * end of every rebalance operation.
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
*/
public KafkaConsumer(Map<String, Object> configs,
- ConsumerRebalanceCallback callback,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
- callback,
keyDeserializer,
valueDeserializer);
}
@@ -471,43 +469,35 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* {@link ConsumerConfig}
*/
public KafkaConsumer(Properties properties) {
- this(properties, null, null, null);
+ this(properties, null, null);
}
/**
* A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a
- * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
+ * {@link ConsumerRebalanceListener} implementation, a key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
*
* @param properties The consumer configuration properties
- * @param callback A callback interface that the user can implement to manage customized offsets on the start and
- * end of every rebalance operation.
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
*/
public KafkaConsumer(Properties properties,
- ConsumerRebalanceCallback callback,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
- callback,
keyDeserializer,
valueDeserializer);
}
@SuppressWarnings("unchecked")
private KafkaConsumer(ConsumerConfig config,
- ConsumerRebalanceCallback callback,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
try {
log.debug("Starting the Kafka consumer");
- if (callback == null)
- callback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
- ConsumerRebalanceCallback.class);
this.time = new SystemTime();
this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
@@ -552,8 +542,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
metricsTags,
this.time,
requestTimeoutMs,
- retryBackoffMs,
- wrapRebalanceCallback(callback));
+ retryBackoffMs);
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
@@ -600,23 +589,41 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * The set of partitions currently assigned to this consumer. If subscription happened by directly subscribing to
- * partitions using {@link #subscribe(TopicPartition...)} then this will simply return the list of partitions that
- * were subscribed to. If subscription was done by specifying only the topic using {@link #subscribe(String...)}
- * then this will give the set of topics currently assigned to the consumer (which may be none if the assignment
- * hasn't happened yet, or the partitions are in the process of getting reassigned).
+ * The set of partitions currently assigned to this consumer. If subscription happened by directly assigning
+ * partitions using {@link #assign(List)} then this will simply return the same partitions that
+ * were assigned. If topic subscription was used, then this will give the set of topic partitions currently assigned
+ * to the consumer (which may be none if the assignment hasn't happened yet, or the partitions are in the
+ * process of getting reassigned).
+ * @return The set of partitions currently assigned to this consumer
*/
- public Set<TopicPartition> subscriptions() {
+ public Set<TopicPartition> assignment() {
acquire();
try {
- return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
+ return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.assignedPartitions()));
} finally {
release();
}
}
/**
- * Incrementally subscribes to the given list of topics and uses the consumer's group management functionality
+ * Get the current subscription. Will return the same topics used in the most recent call to
+ * {@link #subscribe(List, ConsumerRebalanceListener)}, or an empty set if no such call has been made.
+ * @return The set of topics currently subscribed to
+ */
+ public Set<String> subscription() {
+ acquire();
+ try {
+ return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.subscription()));
+ } finally {
+ release();
+ }
+ }
+
+ /**
+ * Subscribe to the given list of topics and use the consumer's group management functionality to
+ * assign partitions. Topic subscriptions are not incremental. This list will replace the current
+ * assignment (if there is one). Note that it is not possible to combine topic subscription with group management
+ * with manual partition assignment through {@link #assign(List)}.
* <p>
* As part of group management, the consumer will keep track of the list of consumers that belong to a particular
* group and will trigger a rebalance operation if one of the following events trigger -
@@ -626,82 +633,76 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* <li>An existing member of the consumer group dies
* <li>A new member is added to an existing consumer group via the join API
* </ul>
+ * <p>
+ * When any of these events are triggered, the provided listener will be invoked first to indicate that
+ * the consumer's assignment has been revoked, and then again when the new assignment has been received.
+ * Note that this listener will immediately override any listener set in a previous call to subscribe.
+ * It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics
+ * subscribed in this call. See {@link ConsumerRebalanceListener} for more details.
*
- * @param topics A variable list of topics that the consumer wants to subscribe to
+ * @param topics The list of topics to subscribe to
+ * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
+ * subscribed topics
*/
@Override
- public void subscribe(String... topics) {
+ public void subscribe(List<String> topics, ConsumerRebalanceListener listener) {
acquire();
try {
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
- for (String topic : topics)
- this.subscriptions.subscribe(topic);
- metadata.addTopics(topics);
+ this.subscriptions.subscribe(topics, SubscriptionState.wrapListener(this, listener));
+ metadata.setTopics(topics);
} finally {
release();
}
}
/**
- * Incrementally subscribes to a specific topic partition and does not use the consumer's group management
- * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
- * metadata change.
+ * Subscribe to the given list of topics and use the consumer's group management functionality to
+ * assign partitions. Topic subscriptions are not incremental. This list will replace the current
+ * assignment (if there is one). It is not possible to combine topic subscription with group management
+ * with manual partition assignment through {@link #assign(List)}.
* <p>
+ * This is a short-hand for {@link #subscribe(List, ConsumerRebalanceListener)}, which
+ * uses a noop listener. If you need the ability to either seek to particular offsets, you should prefer
+ * {@link #subscribe(List, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
+ * to be reset. You should also prefer to provide your own listener if you are doing your own offset
+ * management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
*
- * @param partitions Partitions to incrementally subscribe to
+ * @param topics The list of topics to subscribe to
*/
@Override
- public void subscribe(TopicPartition... partitions) {
- acquire();
- try {
- log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", "));
- for (TopicPartition tp : partitions) {
- this.subscriptions.subscribe(tp);
- metadata.addTopics(tp.topic());
- }
- } finally {
- release();
- }
- }
-
- /**
- * Unsubscribe from the specific topics. This will trigger a rebalance operation and records for this topic will not
- * be returned from the next {@link #poll(long) poll()} onwards
- *
- * @param topics Topics to unsubscribe from
- */
- public void unsubscribe(String... topics) {
- acquire();
- try {
- log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", "));
- // throw an exception if the topic was never subscribed to
- for (String topic : topics)
- this.subscriptions.unsubscribe(topic);
- } finally {
- release();
- }
+ public void subscribe(List<String> topics) {
+ subscribe(topics, new NoOpConsumerRebalanceListener());
}
/**
- * Unsubscribe from the specific topic partitions. records for these partitions will not be returned from the next
- * {@link #poll(long) poll()} onwards
+ * Assign a list of partition to this consumer. This interface does not allow for incremental assignment
+ * and will replace the previous assignment (if there is one).
+ * <p>
+ * Manual topic assignment through this method does not use the consumer's group management
+ * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
+ * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(List)}
+ * and group assignment with {@link #subscribe(List, ConsumerRebalanceListener)}.
*
- * @param partitions Partitions to unsubscribe from
+ * @param partitions The list of partitions to assign this consumer
*/
- public void unsubscribe(TopicPartition... partitions) {
+ @Override
+ public void assign(List<TopicPartition> partitions) {
acquire();
try {
- log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", "));
- // throw an exception if the partition was never subscribed to
- for (TopicPartition partition : partitions)
- this.subscriptions.unsubscribe(partition);
+ log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
+ this.subscriptions.assign(partitions);
+ Set<String> topics = new HashSet<>();
+ for (TopicPartition tp : partitions)
+ topics.add(tp.topic());
+ metadata.setTopics(topics);
} finally {
release();
}
}
/**
- * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have
+ * Fetches data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
* subscribed to any topics or partitions before polling for data.
* <p>
* The offset used for fetching the data is governed by whether or not {@link #seek(TopicPartition, long)} is used.
@@ -736,7 +737,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// handling the fetched records.
fetcher.initFetches(metadata.fetch());
client.poll(0);
- return new ConsumerRecords<K, V>(records);
+ return new ConsumerRecords<>(records);
}
remaining -= end - start;
@@ -1117,20 +1118,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
}
- private Coordinator.RebalanceCallback wrapRebalanceCallback(final ConsumerRebalanceCallback callback) {
- return new Coordinator.RebalanceCallback() {
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- callback.onPartitionsAssigned(KafkaConsumer.this, partitions);
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
- callback.onPartitionsRevoked(KafkaConsumer.this, partitions);
- }
- };
- }
-
/**
* Set the fetch position to the committed position (if there is one)
* or reset it using the offset reset policy the user has configured.
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index b07e760..e33f120 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -12,6 +12,7 @@
*/
package org.apache.kafka.clients.consumer;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -48,34 +49,30 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
@Override
- public synchronized Set<TopicPartition> subscriptions() {
+ public synchronized Set<TopicPartition> assignment() {
return this.subscriptions.assignedPartitions();
}
@Override
- public synchronized void subscribe(String... topics) {
- ensureNotClosed();
- for (String topic : topics)
- this.subscriptions.subscribe(topic);
+ public synchronized Set<String> subscription() {
+ return this.subscriptions.subscription();
}
@Override
- public synchronized void subscribe(TopicPartition... partitions) {
- ensureNotClosed();
- for (TopicPartition partition : partitions)
- this.subscriptions.subscribe(partition);
+ public synchronized void subscribe(List<String> topics) {
+ subscribe(topics, new NoOpConsumerRebalanceListener());
}
- public synchronized void unsubscribe(String... topics) {
+ @Override
+ public synchronized void subscribe(List<String> topics, final ConsumerRebalanceListener listener) {
ensureNotClosed();
- for (String topic : topics)
- this.subscriptions.unsubscribe(topic);
+ this.subscriptions.subscribe(topics, SubscriptionState.wrapListener(this, listener));
}
- public synchronized void unsubscribe(TopicPartition... partitions) {
+ @Override
+ public synchronized void assign(List<TopicPartition> partitions) {
ensureNotClosed();
- for (TopicPartition partition : partitions)
- this.subscriptions.unsubscribe(partition);
+ this.subscriptions.assign(partitions);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 70442aa..b804796 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -46,7 +46,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -72,7 +71,6 @@ public final class Coordinator {
private final CoordinatorMetrics sensors;
private final long requestTimeoutMs;
private final long retryBackoffMs;
- private final RebalanceCallback rebalanceCallback;
private Node consumerCoordinator;
private String consumerId;
private int generation;
@@ -92,8 +90,7 @@ public final class Coordinator {
Map<String, String> metricTags,
Time time,
long requestTimeoutMs,
- long retryBackoffMs,
- RebalanceCallback rebalanceCallback) {
+ long retryBackoffMs) {
this.client = client;
this.time = time;
@@ -109,7 +106,6 @@ public final class Coordinator {
this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
this.requestTimeoutMs = requestTimeoutMs;
this.retryBackoffMs = retryBackoffMs;
- this.rebalanceCallback = rebalanceCallback;
}
/**
@@ -159,25 +155,27 @@ public final class Coordinator {
if (!subscriptions.partitionAssignmentNeeded())
return;
- // execute the user's callback before rebalance
+ SubscriptionState.RebalanceListener listener = subscriptions.listener();
+
+ // execute the user's listener before rebalance
log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
try {
- Set<TopicPartition> revoked = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
- rebalanceCallback.onPartitionsRevoked(revoked);
+ Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
+ listener.onPartitionsRevoked(revoked);
} catch (Exception e) {
- log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+ log.error("User provided listener " + listener.underlying().getClass().getName()
+ " failed on partition revocation: ", e);
}
reassignPartitions();
- // execute the user's callback after rebalance
+ // execute the user's listener after rebalance
log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
try {
- Set<TopicPartition> assigned = new HashSet<TopicPartition>(subscriptions.assignedPartitions());
- rebalanceCallback.onPartitionsAssigned(assigned);
+ Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
+ listener.onPartitionsAssigned(assigned);
} catch (Exception e) {
- log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+ log.error("User provided listener " + listener.underlying().getClass().getName()
+ " failed on partition assignment: ", e);
}
}
@@ -293,7 +291,7 @@ public final class Coordinator {
return RequestFuture.coordinatorNotAvailable();
// send a join group request to the coordinator
- List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscribedTopics());
+ List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscription());
log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics);
JoinGroupRequest request = new JoinGroupRequest(groupId,
@@ -713,10 +711,6 @@ public final class Coordinator {
}
}
- public interface RebalanceCallback {
- void onPartitionsAssigned(Collection<TopicPartition> partitions);
- void onPartitionsRevoked(Collection<TopicPartition> partitions);
- }
private class CoordinatorMetrics {
public final Metrics metrics;
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
deleted file mode 100644
index c06ab3a..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.clients.consumer.internals;
-
-import java.util.Collection;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback;
-import org.apache.kafka.common.TopicPartition;
-
-public class NoOpConsumerRebalanceCallback implements ConsumerRebalanceCallback {
-
- @Override
- public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
-
- @Override
- public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
new file mode 100644
index 0000000..d88d71c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceListener.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.Collection;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+
+public class NoOpConsumerRebalanceListener implements ConsumerRebalanceListener {
+
+ @Override
+ public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
+
+ @Override
+ public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6788ee6..ec6b424 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -12,19 +12,22 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* A class for tracking the topics, partitions, and offsets for the consumer. A partition
- * is "assigned" either directly with {@link #subscribe(TopicPartition)} (manual assignment)
+ * is "assigned" either directly with {@link #assign(List)} (manual assignment)
* or with {@link #changePartitionAssignment(List)} (automatic assignment).
*
* Once assigned, the partition is not considered "fetchable" until its initial position has
@@ -36,8 +39,7 @@ import java.util.Set;
* used. You can also query the pause state independently with {@link #isPaused(TopicPartition)}.
*
* Note that pause state as well as fetch/consumed positions are not preserved when partition
- * assignment is changed either with {@link #unsubscribe(TopicPartition)} or
- * {@link #changePartitionAssignment(List)}.
+ * assignment is changed whether directly by the user or through a group rebalance.
*
* This class also maintains a cache of the latest commit position for each of the assigned
* partitions. This is updated through {@link #committed(TopicPartition, long)} and can be used
@@ -46,13 +48,13 @@ import java.util.Set;
public class SubscriptionState {
/* the list of topics the user has requested */
- private final Set<String> subscribedTopics;
+ private final Set<String> subscription;
/* the list of partitions the user has requested */
- private final Set<TopicPartition> subscribedPartitions;
+ private final Set<TopicPartition> userAssignment;
/* the list of partitions currently assigned */
- private final Map<TopicPartition, TopicPartitionState> assignedPartitions;
+ private final Map<TopicPartition, TopicPartitionState> assignment;
/* do we need to request a partition assignment from the coordinator? */
private boolean needsPartitionAssignment;
@@ -63,64 +65,66 @@ public class SubscriptionState {
/* Default offset reset strategy */
private final OffsetResetStrategy defaultResetStrategy;
+ /* Listener to be invoked when assignment changes */
+ private RebalanceListener listener;
+
public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
this.defaultResetStrategy = defaultResetStrategy;
- this.subscribedTopics = new HashSet<>();
- this.subscribedPartitions = new HashSet<>();
- this.assignedPartitions = new HashMap<>();
+ this.subscription = new HashSet<>();
+ this.userAssignment = new HashSet<>();
+ this.assignment = new HashMap<>();
this.needsPartitionAssignment = false;
this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
}
- public void subscribe(String topic) {
- if (!this.subscribedPartitions.isEmpty())
+ public void subscribe(List<String> topics, RebalanceListener listener) {
+ if (listener == null)
+ throw new IllegalArgumentException("RebalanceListener cannot be null");
+
+ if (!this.userAssignment.isEmpty())
throw new IllegalStateException("Subscription to topics and partitions are mutually exclusive");
- if (!this.subscribedTopics.contains(topic)) {
- this.subscribedTopics.add(topic);
+
+ this.listener = listener;
+
+ if (!this.subscription.equals(new HashSet<>(topics))) {
+ this.subscription.clear();
+ this.subscription.addAll(topics);
this.needsPartitionAssignment = true;
- }
- }
- public void unsubscribe(String topic) {
- if (!this.subscribedTopics.contains(topic))
- throw new IllegalStateException("Topic " + topic + " was never subscribed to.");
- this.subscribedTopics.remove(topic);
- this.needsPartitionAssignment = true;
- final List<TopicPartition> existingAssignedPartitions = new ArrayList<>(assignedPartitions());
- for (TopicPartition tp: existingAssignedPartitions)
- if (topic.equals(tp.topic()))
- clearPartition(tp);
+ // Remove any assigned partitions which are no longer subscribed to
+ for (Iterator<TopicPartition> it = assignment.keySet().iterator(); it.hasNext(); ) {
+ TopicPartition tp = it.next();
+ if (!subscription.contains(tp.topic()))
+ it.remove();
+ }
+ }
}
public void needReassignment() {
this.needsPartitionAssignment = true;
}
- public void subscribe(TopicPartition tp) {
- if (!this.subscribedTopics.isEmpty())
+ public void assign(List<TopicPartition> partitions) {
+ if (!this.subscription.isEmpty())
throw new IllegalStateException("Subscription to topics and partitions are mutually exclusive");
- this.subscribedPartitions.add(tp);
- addAssignedPartition(tp);
- }
- public void unsubscribe(TopicPartition partition) {
- if (!subscribedPartitions.contains(partition))
- throw new IllegalStateException("Partition " + partition + " was never subscribed to.");
- subscribedPartitions.remove(partition);
- clearPartition(partition);
- }
-
- private void clearPartition(TopicPartition tp) {
- this.assignedPartitions.remove(tp);
+ this.userAssignment.clear();
+ this.userAssignment.addAll(partitions);
+
+ for (TopicPartition partition : partitions)
+ if (!assignment.containsKey(partition))
+ addAssignedPartition(partition);
+
+ this.assignment.keySet().retainAll(this.userAssignment);
}
public void clearAssignment() {
- this.assignedPartitions.clear();
- this.needsPartitionAssignment = !subscribedTopics().isEmpty();
+ this.assignment.clear();
+ this.needsPartitionAssignment = !subscription().isEmpty();
}
- public Set<String> subscribedTopics() {
- return this.subscribedTopics;
+ public Set<String> subscription() {
+ return this.subscription;
}
public Long fetched(TopicPartition tp) {
@@ -132,7 +136,7 @@ public class SubscriptionState {
}
private TopicPartitionState assignedState(TopicPartition tp) {
- TopicPartitionState state = this.assignedPartitions.get(tp);
+ TopicPartitionState state = this.assignment.get(tp);
if (state == null)
throw new IllegalStateException("No current assignment for partition " + tp);
return state;
@@ -163,12 +167,12 @@ public class SubscriptionState {
}
public Set<TopicPartition> assignedPartitions() {
- return this.assignedPartitions.keySet();
+ return this.assignment.keySet();
}
public Set<TopicPartition> fetchablePartitions() {
Set<TopicPartition> fetchable = new HashSet<>();
- for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet()) {
+ for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
if (entry.getValue().isFetchable())
fetchable.add(entry.getKey());
}
@@ -176,7 +180,7 @@ public class SubscriptionState {
}
public boolean partitionsAutoAssigned() {
- return !this.subscribedTopics.isEmpty();
+ return !this.subscription.isEmpty();
}
public void consumed(TopicPartition tp, long offset) {
@@ -189,7 +193,7 @@ public class SubscriptionState {
public Map<TopicPartition, Long> allConsumed() {
Map<TopicPartition, Long> allConsumed = new HashMap<>();
- for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet()) {
+ for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
TopicPartitionState state = entry.getValue();
if (state.hasValidPosition)
allConsumed.put(entry.getKey(), state.consumed);
@@ -214,15 +218,15 @@ public class SubscriptionState {
}
public boolean hasAllFetchPositions() {
- for (TopicPartitionState state : assignedPartitions.values())
+ for (TopicPartitionState state : assignment.values())
if (!state.hasValidPosition)
return false;
return true;
}
public Set<TopicPartition> missingFetchPositions() {
- Set<TopicPartition> missing = new HashSet<>(this.assignedPartitions.keySet());
- for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet())
+ Set<TopicPartition> missing = new HashSet<>(this.assignment.keySet());
+ for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet())
if (!entry.getValue().hasValidPosition)
missing.add(entry.getKey());
return missing;
@@ -234,7 +238,7 @@ public class SubscriptionState {
public void changePartitionAssignment(List<TopicPartition> assignments) {
for (TopicPartition tp : assignments)
- if (!this.subscribedTopics.contains(tp.topic()))
+ if (!this.subscription.contains(tp.topic()))
throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
this.clearAssignment();
for (TopicPartition tp: assignments)
@@ -243,7 +247,7 @@ public class SubscriptionState {
}
public boolean isAssigned(TopicPartition tp) {
- return assignedPartitions.containsKey(tp);
+ return assignment.containsKey(tp);
}
public boolean isPaused(TopicPartition tp) {
@@ -263,7 +267,11 @@ public class SubscriptionState {
}
private void addAssignedPartition(TopicPartition tp) {
- this.assignedPartitions.put(tp, new TopicPartitionState());
+ this.assignment.put(tp, new TopicPartitionState());
+ }
+
+ public RebalanceListener listener() {
+ return listener;
}
private static class TopicPartitionState {
@@ -332,4 +340,46 @@ public class SubscriptionState {
}
+ public static RebalanceListener wrapListener(final Consumer<?, ?> consumer,
+ final ConsumerRebalanceListener listener) {
+ if (listener == null)
+ throw new IllegalArgumentException("ConsumerRebalanceLister must not be null");
+
+ return new RebalanceListener() {
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ listener.onPartitionsAssigned(consumer, partitions);
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ listener.onPartitionsRevoked(consumer, partitions);
+ }
+
+ @Override
+ public ConsumerRebalanceListener underlying() {
+ return listener;
+ }
+ };
+ }
+
+ /**
+ * Wrapper around {@link ConsumerRebalanceListener} to get around the need to provide a reference
+ * to the consumer in this class.
+ */
+ public static class RebalanceListener {
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+
+ }
+
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+
+ }
+
+ public ConsumerRebalanceListener underlying() {
+ return null;
+ }
+ }
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 738f3ed..7625218 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -37,7 +37,7 @@ public class KafkaConsumerTest {
final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
try {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
- props, null, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
} catch (KafkaException e) {
Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index d4da642..6e6a118 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -16,12 +16,15 @@
*/
package org.apache.kafka.clients.consumer;
-import static org.junit.Assert.*;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
public class MockConsumerTest {
@@ -29,7 +32,21 @@ public class MockConsumerTest {
@Test
public void testSimpleMock() {
- consumer.subscribe("test");
+ consumer.subscribe(Arrays.asList("test"), new ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+
+ }
+
+ @Override
+ public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+
+ }
+ });
+
+
+
+
assertEquals(0, consumer.poll(1000).count());
ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, "key1", "value1");
ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, "key2", "value2");
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index a23b8e7..3452639 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -72,7 +73,7 @@ public class CoordinatorTest {
private Metrics metrics;
private Map<String, String> metricTags = new LinkedHashMap<String, String>();
private ConsumerNetworkClient consumerClient;
- private MockRebalanceCallback rebalanceCallback;
+ private MockSubscriptionListener subscriptionListener;
private Coordinator coordinator;
@Before
@@ -83,7 +84,7 @@ public class CoordinatorTest {
this.metadata = new Metadata(0, Long.MAX_VALUE);
this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
this.metrics = new Metrics(time);
- this.rebalanceCallback = new MockRebalanceCallback();
+ this.subscriptionListener = new MockSubscriptionListener();
client.setNode(node);
@@ -98,8 +99,7 @@ public class CoordinatorTest {
metricTags,
time,
requestTimeoutMs,
- retryBackoffMs,
- rebalanceCallback);
+ retryBackoffMs);
}
@Test
@@ -168,7 +168,7 @@ public class CoordinatorTest {
coordinator.ensureCoordinatorKnown();
// illegal_generation will cause re-partition
- subscriptions.subscribe(topicName);
+ subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
subscriptions.changePartitionAssignment(Collections.singletonList(tp));
time.sleep(sessionTimeoutMs);
@@ -192,7 +192,7 @@ public class CoordinatorTest {
coordinator.ensureCoordinatorKnown();
// illegal_generation will cause re-partition
- subscriptions.subscribe(topicName);
+ subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
subscriptions.changePartitionAssignment(Collections.singletonList(tp));
time.sleep(sessionTimeoutMs);
@@ -233,7 +233,7 @@ public class CoordinatorTest {
@Test
public void testNormalJoinGroup() {
- subscriptions.subscribe(topicName);
+ subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
@@ -245,15 +245,15 @@ public class CoordinatorTest {
assertFalse(subscriptions.partitionAssignmentNeeded());
assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
- assertEquals(1, rebalanceCallback.revokedCount);
- assertEquals(Collections.emptySet(), rebalanceCallback.revoked);
- assertEquals(1, rebalanceCallback.assignedCount);
- assertEquals(Collections.singleton(tp), rebalanceCallback.assigned);
+ assertEquals(1, subscriptionListener.revokedCount);
+ assertEquals(Collections.emptySet(), subscriptionListener.revoked);
+ assertEquals(1, subscriptionListener.assignedCount);
+ assertEquals(Collections.singleton(tp), subscriptionListener.assigned);
}
@Test
public void testReJoinGroup() {
- subscriptions.subscribe(topicName);
+ subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
@@ -266,15 +266,15 @@ public class CoordinatorTest {
coordinator.ensurePartitionAssignment();
assertFalse(subscriptions.partitionAssignmentNeeded());
assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
- assertEquals(1, rebalanceCallback.revokedCount);
- assertEquals(Collections.emptySet(), rebalanceCallback.revoked);
- assertEquals(1, rebalanceCallback.assignedCount);
- assertEquals(Collections.singleton(tp), rebalanceCallback.assigned);
+ assertEquals(1, subscriptionListener.revokedCount);
+ assertEquals(Collections.emptySet(), subscriptionListener.revoked);
+ assertEquals(1, subscriptionListener.assignedCount);
+ assertEquals(Collections.singleton(tp), subscriptionListener.assigned);
}
@Test(expected = ApiException.class)
public void testUnknownPartitionAssignmentStrategy() {
- subscriptions.subscribe(topicName);
+ subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
@@ -287,7 +287,7 @@ public class CoordinatorTest {
@Test(expected = ApiException.class)
public void testInvalidSessionTimeout() {
- subscriptions.subscribe(topicName);
+ subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
subscriptions.needReassignment();
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
@@ -431,7 +431,7 @@ public class CoordinatorTest {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
coordinator.refreshCommittedOffsetsIfNeeded();
@@ -444,7 +444,7 @@ public class CoordinatorTest {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
@@ -458,7 +458,7 @@ public class CoordinatorTest {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code(), "", 100L));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
@@ -473,7 +473,7 @@ public class CoordinatorTest {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
coordinator.refreshCommittedOffsetsIfNeeded();
@@ -528,7 +528,7 @@ public class CoordinatorTest {
}
}
- private static class MockRebalanceCallback implements Coordinator.RebalanceCallback {
+ private static class MockSubscriptionListener extends SubscriptionState.RebalanceListener {
public Collection<TopicPartition> revoked;
public Collection<TopicPartition> assigned;
public int revokedCount = 0;
@@ -546,5 +546,6 @@ public class CoordinatorTest {
this.revoked = partitions;
revokedCount++;
}
+
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 22712bb..f2a8381 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -56,6 +56,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class FetcherTest {
+ private SubscriptionState.RebalanceListener listener = new SubscriptionState.RebalanceListener();
private String topicName = "test";
private String groupId = "test-group";
private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics";
@@ -107,7 +108,7 @@ public class FetcherTest {
@Test
public void testFetchNormal() {
List<ConsumerRecord<byte[], byte[]>> records;
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.seek(tp, 0);
// normal fetch
@@ -127,7 +128,7 @@ public class FetcherTest {
@Test
public void testFetchDuringRebalance() {
- subscriptions.subscribe(topicName);
+ subscriptions.subscribe(Arrays.asList(topicName), listener);
subscriptions.changePartitionAssignment(Arrays.asList(tp));
subscriptions.seek(tp, 0);
@@ -144,7 +145,7 @@ public class FetcherTest {
@Test
public void testInFlightFetchOnPausedPartition() {
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
@@ -157,7 +158,7 @@ public class FetcherTest {
@Test
public void testFetchOnPausedPartition() {
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.seek(tp, 0);
subscriptions.pause(tp);
@@ -167,7 +168,7 @@ public class FetcherTest {
@Test
public void testFetchNotLeaderForPartition() {
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
@@ -179,7 +180,7 @@ public class FetcherTest {
@Test
public void testFetchUnknownTopicOrPartition() {
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
@@ -191,7 +192,7 @@ public class FetcherTest {
@Test
public void testFetchOffsetOutOfRange() {
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
@@ -205,7 +206,7 @@ public class FetcherTest {
@Test
public void testFetchDisconnected() {
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
@@ -224,7 +225,7 @@ public class FetcherTest {
public void testUpdateFetchPositionToCommitted() {
// unless a specific reset is expected, the default behavior is to reset to the committed
// position if one is present
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.committed(tp, 5);
fetcher.updateFetchPositions(Collections.singleton(tp));
@@ -235,7 +236,7 @@ public class FetcherTest {
@Test
public void testUpdateFetchPositionResetToDefaultOffset() {
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
// with no commit position, we should reset using the default strategy defined above (EARLIEST)
client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
@@ -249,7 +250,7 @@ public class FetcherTest {
@Test
public void testUpdateFetchPositionResetToLatestOffset() {
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
@@ -263,7 +264,7 @@ public class FetcherTest {
@Test
public void testUpdateFetchPositionResetToEarliestOffset() {
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
@@ -277,7 +278,7 @@ public class FetcherTest {
@Test
public void testUpdateFetchPositionDisconnect() {
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
// First request gets a disconnect
@@ -311,7 +312,7 @@ public class FetcherTest {
@Test
public void testQuotaMetrics() throws Exception {
List<ConsumerRecord<byte[], byte[]>> records;
- subscriptions.subscribe(tp);
+ subscriptions.assign(Arrays.asList(tp));
subscriptions.seek(tp, 0);
// normal fetch
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 1ba6f7a..e4830b1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static java.util.Arrays.asList;
+import java.util.Arrays;
import java.util.Collections;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -28,20 +29,21 @@ import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
public class SubscriptionStateTest {
-
+
+ private SubscriptionState.RebalanceListener listener = new SubscriptionState.RebalanceListener();
private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private final TopicPartition tp0 = new TopicPartition("test", 0);
private final TopicPartition tp1 = new TopicPartition("test", 1);
@Test
- public void partitionSubscription() {
- state.subscribe(tp0);
+ public void partitionAssignment() {
+ state.assign(Arrays.asList(tp0));
assertEquals(Collections.singleton(tp0), state.assignedPartitions());
state.committed(tp0, 1);
state.seek(tp0, 1);
assertTrue(state.isFetchable(tp0));
assertAllPositions(tp0, 1L);
- state.unsubscribe(tp0);
+ state.assign(Arrays.<TopicPartition>asList());
assertTrue(state.assignedPartitions().isEmpty());
assertFalse(state.isAssigned(tp0));
assertFalse(state.isFetchable(tp0));
@@ -49,7 +51,7 @@ public class SubscriptionStateTest {
@Test
public void partitionReset() {
- state.subscribe(tp0);
+ state.assign(Arrays.asList(tp0));
state.seek(tp0, 5);
assertEquals(5L, (long) state.fetched(tp0));
assertEquals(5L, (long) state.consumed(tp0));
@@ -67,8 +69,8 @@ public class SubscriptionStateTest {
@Test
public void topicSubscription() {
- state.subscribe("test");
- assertEquals(1, state.subscribedTopics().size());
+ state.subscribe(Arrays.asList("test"), listener);
+ assertEquals(1, state.subscription().size());
assertTrue(state.assignedPartitions().isEmpty());
assertTrue(state.partitionsAutoAssigned());
state.changePartitionAssignment(asList(tp0));
@@ -84,7 +86,7 @@ public class SubscriptionStateTest {
@Test
public void partitionPause() {
- state.subscribe(tp0);
+ state.assign(Arrays.asList(tp0));
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
state.pause(tp0);
@@ -96,8 +98,8 @@ public class SubscriptionStateTest {
@Test
public void topicUnsubscription() {
final String topic = "test";
- state.subscribe(topic);
- assertEquals(1, state.subscribedTopics().size());
+ state.subscribe(Arrays.asList(topic), listener);
+ assertEquals(1, state.subscription().size());
assertTrue(state.assignedPartitions().isEmpty());
assertTrue(state.partitionsAutoAssigned());
state.changePartitionAssignment(asList(tp0));
@@ -108,21 +110,21 @@ public class SubscriptionStateTest {
assertFalse(state.isAssigned(tp0));
assertEquals(Collections.singleton(tp1), state.assignedPartitions());
- state.unsubscribe(topic);
- assertEquals(0, state.subscribedTopics().size());
+ state.subscribe(Arrays.<String>asList(), listener);
+ assertEquals(0, state.subscription().size());
assertTrue(state.assignedPartitions().isEmpty());
}
@Test(expected = IllegalStateException.class)
public void invalidConsumedPositionUpdate() {
- state.subscribe("test");
+ state.subscribe(Arrays.asList("test"), listener);
state.changePartitionAssignment(asList(tp0));
state.consumed(tp0, 0);
}
@Test(expected = IllegalStateException.class)
public void invalidFetchPositionUpdate() {
- state.subscribe("test");
+ state.subscribe(Arrays.asList("test"), listener);
state.changePartitionAssignment(asList(tp0));
state.fetched(tp0, 0);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index 4eaf756..272f62a 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -120,7 +120,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
**/
public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) {
HashMap<TopicPartition, Long> offsets = new HashMap<>();
- for (TopicPartition tp : consumer.subscriptions()) {
+ for (TopicPartition tp : consumer.assignment()) {
offsets.put(tp, consumer.position(tp));
}
// We only don't flush the task in one case: when shutting down, the task has already been
@@ -179,7 +179,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
}
log.debug("Task {} subscribing to topics {}", id, topics);
- newConsumer.subscribe(topics);
+ newConsumer.subscribe(Arrays.asList(topics));
// Seek to any user-provided offsets. This is useful if offsets are tracked in the downstream system (e.g., to
// enable exactly once delivery to that system).
@@ -188,7 +188,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
// We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly.
newConsumer.poll(0);
Map<TopicPartition, Long> offsets = context.getOffsets();
- for (TopicPartition tp : newConsumer.subscriptions()) {
+ for (TopicPartition tp : newConsumer.assignment()) {
Long offset = offsets.get(tp);
if (offset != null)
newConsumer.seek(tp, offset);
http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 0c6f950..e5286e3 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -330,7 +330,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
throws Exception {
final long finalOffset = FIRST_OFFSET + expectedMessages - 1;
- EasyMock.expect(consumer.subscriptions()).andReturn(Collections.singleton(TOPIC_PARTITION));
+ EasyMock.expect(consumer.assignment()).andReturn(Collections.singleton(TOPIC_PARTITION));
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andAnswer(
new IAnswer<Long>() {
@Override