You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/30 23:23:05 UTC
kafka git commit: KAFKA-2350; KafkaConsumer pause/resume API
Repository: kafka
Updated Branches:
refs/heads/trunk 1162cc1dd -> be82a2afc
KAFKA-2350; KafkaConsumer pause/resume API
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael, Ashish, Guozhang
Closes #100 from hachikuji/KAFKA-2350 and squashes the following commits:
250e823 [Jason Gustafson] KAFKA-2350; KafkaConsumer pause/resume API
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be82a2af
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be82a2af
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be82a2af
Branch: refs/heads/trunk
Commit: be82a2afc9e38adc0109dc694834ca5947128877
Parents: 1162cc1
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Jul 30 14:23:43 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jul 30 14:23:43 2015 -0700
----------------------------------------------------------------------
.../apache/kafka/clients/consumer/Consumer.java | 10 +
.../kafka/clients/consumer/KafkaConsumer.java | 48 +++-
.../kafka/clients/consumer/MockConsumer.java | 39 ++-
.../clients/consumer/internals/Coordinator.java | 8 +-
.../clients/consumer/internals/Fetcher.java | 45 ++--
.../consumer/internals/SubscriptionState.java | 238 +++++++++++++------
.../clients/consumer/MockConsumerTest.java | 2 +-
.../clients/consumer/internals/FetcherTest.java | 36 ++-
.../internals/SubscriptionStateTest.java | 58 ++++-
.../integration/kafka/api/ConsumerTest.scala | 32 ++-
10 files changed, 386 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 23e410b..158e1ea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -119,6 +119,16 @@ public interface Consumer<K, V> extends Closeable {
public Map<String, List<PartitionInfo>> listTopics();
/**
+ * @see KafkaConsumer#pause(TopicPartition...)
+ */
+ public void pause(TopicPartition... partitions);
+
+ /**
+ * @see KafkaConsumer#resume(TopicPartition...)
+ */
+ public void resume(TopicPartition... partitions);
+
+ /**
* @see KafkaConsumer#close()
*/
public void close();
http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 923ff99..7851644 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -43,7 +43,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -852,9 +851,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public void commit(CommitType commitType, ConsumerCommitCallback callback) {
acquire();
try {
- // need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance)
- Map<TopicPartition, Long> allConsumed = new HashMap<TopicPartition, Long>(this.subscriptions.allConsumed());
- commit(allConsumed, commitType, callback);
+ commit(subscriptions.allConsumed(), commitType, callback);
} finally {
release();
}
@@ -941,7 +938,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public long position(TopicPartition partition) {
acquire();
try {
- if (!this.subscriptions.assignedPartitions().contains(partition))
+ if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.consumed(partition);
if (offset == null) {
@@ -972,7 +969,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
acquire();
try {
Long committed;
- if (subscriptions.assignedPartitions().contains(partition)) {
+ if (subscriptions.isAssigned(partition)) {
committed = this.subscriptions.committed(partition);
if (committed == null) {
coordinator.refreshCommittedOffsetsIfNeeded();
@@ -1040,6 +1037,45 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
}
+ /**
+ * Suspend fetching from the requested partitions. Future calls to {@link #poll(long)} will not return
+ * any records from these partitions until they have been resumed using {@link #resume(TopicPartition...)}.
+ * Note that this method does not affect partition subscription. In particular, it does not cause a group
+ * rebalance when automatic assignment is used.
+ * @param partitions The partitions which should be paused
+ */
+ @Override
+ public void pause(TopicPartition... partitions) {
+ acquire();
+ try {
+ for (TopicPartition partition: partitions) {
+ log.debug("Pausing partition {}", partition);
+ subscriptions.pause(partition);
+ }
+ } finally {
+ release();
+ }
+ }
+
+ /**
+ * Resume any partitions which have been paused with {@link #pause(TopicPartition...)}. New calls to
+ * {@link #poll(long)} will return records from these partitions if there are any to be fetched.
+ * If the partitions were not previously paused, this method is a no-op.
+ * @param partitions The partitions which should be resumed
+ */
+ @Override
+ public void resume(TopicPartition... partitions) {
+ acquire();
+ try {
+ for (TopicPartition partition: partitions) {
+ log.debug("Resuming partition {}", partition);
+ subscriptions.resume(partition);
+ }
+ } finally {
+ release();
+ }
+ }
+
@Override
public void close() {
acquire();
http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 5b22fa0..b07e760 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -12,6 +12,12 @@
*/
package org.apache.kafka.clients.consumer;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -20,12 +26,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.apache.kafka.clients.consumer.internals.SubscriptionState;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.MetricName;
-
/**
* A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
* threadsafe </i>
@@ -83,9 +83,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
ensureNotClosed();
// update the consumed offset
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
- List<ConsumerRecord<K, V>> recs = entry.getValue();
- if (!recs.isEmpty())
- this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset());
+ if (!subscriptions.isPaused(entry.getKey())) {
+ List<ConsumerRecord<K, V>> recs = entry.getValue();
+ if (!recs.isEmpty())
+ this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset());
+ }
}
ConsumerRecords<K, V> copy = new ConsumerRecords<K, V>(this.records);
@@ -96,7 +98,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
public synchronized void addRecord(ConsumerRecord<K, V> record) {
ensureNotClosed();
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
- this.subscriptions.assignedPartitions().add(tp);
+ ArrayList<TopicPartition> currentAssigned = new ArrayList<>(this.subscriptions.assignedPartitions());
+ if (!currentAssigned.contains(tp)) {
+ currentAssigned.add(tp);
+ this.subscriptions.changePartitionAssignment(currentAssigned);
+ }
+ subscriptions.seek(tp, record.offset());
List<ConsumerRecord<K, V>> recs = this.records.get(tp);
if (recs == null) {
recs = new ArrayList<ConsumerRecord<K, V>>();
@@ -189,6 +196,18 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
@Override
+ public void pause(TopicPartition... partitions) {
+ for (TopicPartition partition : partitions)
+ subscriptions.pause(partition);
+ }
+
+ @Override
+ public void resume(TopicPartition... partitions) {
+ for (TopicPartition partition : partitions)
+ subscriptions.resume(partition);
+ }
+
+ @Override
public synchronized void close() {
ensureNotClosed();
this.closed = true;
http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 6026b23..cd5cdc3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -119,7 +119,9 @@ public final class Coordinator {
Map<TopicPartition, Long> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
TopicPartition tp = entry.getKey();
- this.subscriptions.committed(tp, entry.getValue());
+ // verify assignment is still active
+ if (subscriptions.isAssigned(tp))
+ this.subscriptions.committed(tp, entry.getValue());
}
this.subscriptions.commitsRefreshed();
}
@@ -459,7 +461,9 @@ public final class Coordinator {
short errorCode = entry.getValue();
if (errorCode == Errors.NONE.code()) {
log.debug("Committed offset {} for partition {}", offset, tp);
- subscriptions.committed(tp, offset);
+ if (subscriptions.isAssigned(tp))
+ // update the local cache only if the partition is still assigned
+ subscriptions.committed(tp, offset);
} else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
|| errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
coordinatorDead();
http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 9f71451..956197b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -143,8 +143,7 @@ public class Fetcher<K, V> {
public void updateFetchPositions(Set<TopicPartition> partitions) {
// reset the fetch position to the committed position
for (TopicPartition tp : partitions) {
- // skip if we already have a fetch position
- if (subscriptions.fetched(tp) != null)
+ if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp))
continue;
// TODO: If there are several offsets to reset, we could submit offset requests in parallel
@@ -222,7 +221,10 @@ public class Fetcher<K, V> {
log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
long offset = listOffset(partition, timestamp);
- this.subscriptions.seek(partition, offset);
+
+ // we might lose the assignment while fetching the offset, so check it is still active
+ if (subscriptions.isAssigned(partition))
+ this.subscriptions.seek(partition, offset);
}
/**
@@ -259,11 +261,15 @@ public class Fetcher<K, V> {
if (this.subscriptions.partitionAssignmentNeeded()) {
return Collections.emptyMap();
} else {
- Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
+ Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
for (PartitionRecords<K, V> part : this.records) {
+ if (!subscriptions.isFetchable(part.partition)) {
+ log.debug("Ignoring fetched records for {} since it is no longer fetchable", part.partition);
+ continue;
+ }
+
Long consumed = subscriptions.consumed(part.partition);
- if (this.subscriptions.assignedPartitions().contains(part.partition)
- && consumed != null && part.fetchOffset == consumed) {
+ if (consumed != null && part.fetchOffset == consumed) {
List<ConsumerRecord<K, V>> records = drained.get(part.partition);
if (records == null) {
records = part.records;
@@ -354,8 +360,8 @@ public class Fetcher<K, V> {
*/
private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
// create the fetch info
- Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Node, Map<TopicPartition, FetchRequest.PartitionData>>();
- for (TopicPartition partition : subscriptions.assignedPartitions()) {
+ Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
+ for (TopicPartition partition : subscriptions.fetchablePartitions()) {
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
@@ -363,16 +369,17 @@ public class Fetcher<K, V> {
// if there is a leader and no in-flight requests, issue a new fetch
Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
- fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
+ fetch = new HashMap<>();
fetchable.put(node, fetch);
}
+
long offset = this.subscriptions.fetched(partition);
fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
}
}
// create the fetches
- Map<Node, FetchRequest> requests = new HashMap<Node, FetchRequest>();
+ Map<Node, FetchRequest> requests = new HashMap<>();
for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
Node node = entry.getKey();
FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
@@ -399,15 +406,7 @@ public class Fetcher<K, V> {
if (!subscriptions.assignedPartitions().contains(tp)) {
log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp);
} else if (partition.errorCode == Errors.NONE.code()) {
- int bytes = 0;
- ByteBuffer buffer = partition.recordSet;
- MemoryRecords records = MemoryRecords.readableRecords(buffer);
long fetchOffset = request.fetchData().get(tp).offset;
- List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
- for (LogEntry logEntry : records) {
- parsed.add(parseRecord(tp, logEntry));
- bytes += logEntry.size();
- }
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
@@ -422,7 +421,15 @@ public class Fetcher<K, V> {
continue;
}
- if (parsed.size() > 0) {
+ int bytes = 0;
+ ByteBuffer buffer = partition.recordSet;
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
+ for (LogEntry logEntry : records) {
+ parsed.add(parseRecord(tp, logEntry));
+ bytes += logEntry.size();
+ }
+ if (!parsed.isEmpty()) {
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
this.subscriptions.fetched(tp, record.offset() + 1);
this.records.add(new PartitionRecords<K, V>(fetchOffset, tp, parsed));
http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 8a2cb12..6788ee6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -23,7 +23,25 @@ import java.util.Map;
import java.util.Set;
/**
- * A class for tracking the topics, partitions, and offsets for the consumer
+ * A class for tracking the topics, partitions, and offsets for the consumer. A partition
+ * is "assigned" either directly with {@link #subscribe(TopicPartition)} (manual assignment)
+ * or with {@link #changePartitionAssignment(List)} (automatic assignment).
+ *
+ * Once assigned, the partition is not considered "fetchable" until its initial position has
+ * been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track a fetch
+ * position which is used to set the offset of the next fetch, and a consumed position
+ * which is the last offset that has been returned to the user. You can suspend fetching
+ * from a partition through {@link #pause(TopicPartition)} without affecting the fetched/consumed
+ * offsets. The partition will remain unfetchable until the {@link #resume(TopicPartition)} is
+ * used. You can also query the pause state independently with {@link #isPaused(TopicPartition)}.
+ *
+ * Note that pause state as well as fetch/consumed positions are not preserved when partition
+ * assignment is changed either with {@link #unsubscribe(TopicPartition)} or
+ * {@link #changePartitionAssignment(List)}.
+ *
+ * This class also maintains a cache of the latest commit position for each of the assigned
+ * partitions. This is updated through {@link #committed(TopicPartition, long)} and can be used
+ * to set the initial fetch position (e.g. {@link Fetcher#resetOffset(TopicPartition)}.
*/
public class SubscriptionState {
@@ -34,16 +52,7 @@ public class SubscriptionState {
private final Set<TopicPartition> subscribedPartitions;
/* the list of partitions currently assigned */
- private final Set<TopicPartition> assignedPartitions;
-
- /* the offset exposed to the user */
- private final Map<TopicPartition, Long> consumed;
-
- /* the current point we have fetched up to */
- private final Map<TopicPartition, Long> fetched;
-
- /* the last committed offset for each partition */
- private final Map<TopicPartition, Long> committed;
+ private final Map<TopicPartition, TopicPartitionState> assignedPartitions;
/* do we need to request a partition assignment from the coordinator? */
private boolean needsPartitionAssignment;
@@ -51,28 +60,21 @@ public class SubscriptionState {
/* do we need to request the latest committed offsets from the coordinator? */
private boolean needsFetchCommittedOffsets;
- /* Partitions that need to be reset before fetching */
- private Map<TopicPartition, OffsetResetStrategy> resetPartitions;
-
/* Default offset reset strategy */
- private OffsetResetStrategy offsetResetStrategy;
-
- public SubscriptionState(OffsetResetStrategy offsetResetStrategy) {
- this.offsetResetStrategy = offsetResetStrategy;
- this.subscribedTopics = new HashSet<String>();
- this.subscribedPartitions = new HashSet<TopicPartition>();
- this.assignedPartitions = new HashSet<TopicPartition>();
- this.consumed = new HashMap<TopicPartition, Long>();
- this.fetched = new HashMap<TopicPartition, Long>();
- this.committed = new HashMap<TopicPartition, Long>();
+ private final OffsetResetStrategy defaultResetStrategy;
+
+ public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
+ this.defaultResetStrategy = defaultResetStrategy;
+ this.subscribedTopics = new HashSet<>();
+ this.subscribedPartitions = new HashSet<>();
+ this.assignedPartitions = new HashMap<>();
this.needsPartitionAssignment = false;
this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
- this.resetPartitions = new HashMap<TopicPartition, OffsetResetStrategy>();
}
public void subscribe(String topic) {
- if (this.subscribedPartitions.size() > 0)
- throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
+ if (!this.subscribedPartitions.isEmpty())
+ throw new IllegalStateException("Subscription to topics and partitions are mutually exclusive");
if (!this.subscribedTopics.contains(topic)) {
this.subscribedTopics.add(topic);
this.needsPartitionAssignment = true;
@@ -95,10 +97,10 @@ public class SubscriptionState {
}
public void subscribe(TopicPartition tp) {
- if (this.subscribedTopics.size() > 0)
- throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
+ if (!this.subscribedTopics.isEmpty())
+ throw new IllegalStateException("Subscription to topics and partitions are mutually exclusive");
this.subscribedPartitions.add(tp);
- this.assignedPartitions.add(tp);
+ addAssignedPartition(tp);
}
public void unsubscribe(TopicPartition partition) {
@@ -110,17 +112,10 @@ public class SubscriptionState {
private void clearPartition(TopicPartition tp) {
this.assignedPartitions.remove(tp);
- this.committed.remove(tp);
- this.fetched.remove(tp);
- this.consumed.remove(tp);
- this.resetPartitions.remove(tp);
}
public void clearAssignment() {
this.assignedPartitions.clear();
- this.committed.clear();
- this.fetched.clear();
- this.consumed.clear();
this.needsPartitionAssignment = !subscribedTopics().isEmpty();
}
@@ -129,21 +124,26 @@ public class SubscriptionState {
}
public Long fetched(TopicPartition tp) {
- return this.fetched.get(tp);
+ return assignedState(tp).fetched;
}
public void fetched(TopicPartition tp, long offset) {
- if (!this.assignedPartitions.contains(tp))
- throw new IllegalArgumentException("Can't change the fetch position for a partition you are not currently subscribed to.");
- this.fetched.put(tp, offset);
+ assignedState(tp).fetched(offset);
+ }
+
+ private TopicPartitionState assignedState(TopicPartition tp) {
+ TopicPartitionState state = this.assignedPartitions.get(tp);
+ if (state == null)
+ throw new IllegalStateException("No current assignment for partition " + tp);
+ return state;
}
public void committed(TopicPartition tp, long offset) {
- this.committed.put(tp, offset);
+ assignedState(tp).committed(offset);
}
public Long committed(TopicPartition tp) {
- return this.committed.get(tp);
+ return assignedState(tp).committed;
}
public void needRefreshCommits() {
@@ -157,15 +157,22 @@ public class SubscriptionState {
public void commitsRefreshed() {
this.needsFetchCommittedOffsets = false;
}
-
+
public void seek(TopicPartition tp, long offset) {
- fetched(tp, offset);
- consumed(tp, offset);
- resetPartitions.remove(tp);
+ assignedState(tp).seek(offset);
}
public Set<TopicPartition> assignedPartitions() {
- return this.assignedPartitions;
+ return this.assignedPartitions.keySet();
+ }
+
+ public Set<TopicPartition> fetchablePartitions() {
+ Set<TopicPartition> fetchable = new HashSet<>();
+ for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet()) {
+ if (entry.getValue().isFetchable())
+ fetchable.add(entry.getKey());
+ }
+ return fetchable;
}
public boolean partitionsAutoAssigned() {
@@ -173,49 +180,52 @@ public class SubscriptionState {
}
public void consumed(TopicPartition tp, long offset) {
- if (!this.assignedPartitions.contains(tp))
- throw new IllegalArgumentException("Can't change the consumed position for a partition you are not currently subscribed to.");
- this.consumed.put(tp, offset);
+ assignedState(tp).consumed(offset);
}
- public Long consumed(TopicPartition partition) {
- return this.consumed.get(partition);
+ public Long consumed(TopicPartition tp) {
+ return assignedState(tp).consumed;
}
public Map<TopicPartition, Long> allConsumed() {
- return this.consumed;
+ Map<TopicPartition, Long> allConsumed = new HashMap<>();
+ for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet()) {
+ TopicPartitionState state = entry.getValue();
+ if (state.hasValidPosition)
+ allConsumed.put(entry.getKey(), state.consumed);
+ }
+ return allConsumed;
}
public void needOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) {
- this.resetPartitions.put(partition, offsetResetStrategy);
- this.fetched.remove(partition);
- this.consumed.remove(partition);
+ assignedState(partition).awaitReset(offsetResetStrategy);
}
public void needOffsetReset(TopicPartition partition) {
- needOffsetReset(partition, offsetResetStrategy);
+ needOffsetReset(partition, defaultResetStrategy);
}
public boolean isOffsetResetNeeded(TopicPartition partition) {
- return resetPartitions.containsKey(partition);
- }
-
- public boolean isOffsetResetNeeded() {
- return !resetPartitions.isEmpty();
+ return assignedState(partition).awaitingReset;
}
public OffsetResetStrategy resetStrategy(TopicPartition partition) {
- return resetPartitions.get(partition);
+ return assignedState(partition).resetStrategy;
}
public boolean hasAllFetchPositions() {
- return this.fetched.size() >= this.assignedPartitions.size();
+ for (TopicPartitionState state : assignedPartitions.values())
+ if (!state.hasValidPosition)
+ return false;
+ return true;
}
public Set<TopicPartition> missingFetchPositions() {
- Set<TopicPartition> copy = new HashSet<TopicPartition>(this.assignedPartitions);
- copy.removeAll(this.fetched.keySet());
- return copy;
+ Set<TopicPartition> missing = new HashSet<>(this.assignedPartitions.keySet());
+ for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet())
+ if (!entry.getValue().hasValidPosition)
+ missing.add(entry.getKey());
+ return missing;
}
public boolean partitionAssignmentNeeded() {
@@ -227,9 +237,99 @@ public class SubscriptionState {
if (!this.subscribedTopics.contains(tp.topic()))
throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
this.clearAssignment();
- this.assignedPartitions.addAll(assignments);
+ for (TopicPartition tp: assignments)
+ addAssignedPartition(tp);
this.needsPartitionAssignment = false;
}
+ public boolean isAssigned(TopicPartition tp) {
+ return assignedPartitions.containsKey(tp);
+ }
+
+ public boolean isPaused(TopicPartition tp) {
+ return isAssigned(tp) && assignedState(tp).paused;
+ }
+
+ public boolean isFetchable(TopicPartition tp) {
+ return isAssigned(tp) && assignedState(tp).isFetchable();
+ }
+
+ public void pause(TopicPartition tp) {
+ assignedState(tp).pause();
+ }
+
+ public void resume(TopicPartition tp) {
+ assignedState(tp).resume();
+ }
+
+ private void addAssignedPartition(TopicPartition tp) {
+ this.assignedPartitions.put(tp, new TopicPartitionState());
+ }
+
+ private static class TopicPartitionState {
+ private Long consumed; // offset exposed to the user
+ private Long fetched; // current fetch position
+ private Long committed; // last committed position
+
+ private boolean hasValidPosition; // whether we have valid consumed and fetched positions
+ private boolean paused; // whether this partition has been paused by the user
+ private boolean awaitingReset; // whether we are awaiting reset
+ private OffsetResetStrategy resetStrategy; // the reset strategy if awaitingReset is set
+
+ public TopicPartitionState() {
+ this.paused = false;
+ this.consumed = null;
+ this.fetched = null;
+ this.committed = null;
+ this.awaitingReset = false;
+ this.hasValidPosition = false;
+ this.resetStrategy = null;
+ }
+
+ private void awaitReset(OffsetResetStrategy strategy) {
+ this.awaitingReset = true;
+ this.resetStrategy = strategy;
+ this.consumed = null;
+ this.fetched = null;
+ this.hasValidPosition = false;
+ }
+
+ private void seek(long offset) {
+ this.consumed = offset;
+ this.fetched = offset;
+ this.awaitingReset = false;
+ this.resetStrategy = null;
+ this.hasValidPosition = true;
+ }
+
+ private void fetched(long offset) {
+ if (!hasValidPosition)
+ throw new IllegalStateException("Cannot update fetch position without valid consumed/fetched positions");
+ this.fetched = offset;
+ }
+
+ private void consumed(long offset) {
+ if (!hasValidPosition)
+ throw new IllegalStateException("Cannot update consumed position without valid consumed/fetched positions");
+ this.consumed = offset;
+ }
+
+ private void committed(Long offset) {
+ this.committed = offset;
+ }
+
+ private void pause() {
+ this.paused = true;
+ }
+
+ private void resume() {
+ this.paused = false;
+ }
+
+ private boolean isFetchable() {
+ return !paused && hasValidPosition;
+ }
+
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 26b6b40..d4da642 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -29,7 +29,7 @@ public class MockConsumerTest {
@Test
public void testSimpleMock() {
- consumer.subscribe("topic");
+ consumer.subscribe("test");
assertEquals(0, consumer.poll(1000).count());
ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, "key1", "value1");
ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, "key2", "value2");
http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 06e2990..56850bb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -45,6 +45,7 @@ import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class FetcherTest {
@@ -99,8 +100,7 @@ public class FetcherTest {
public void testFetchNormal() {
List<ConsumerRecord<byte[], byte[]>> records;
subscriptions.subscribe(tp);
- subscriptions.fetched(tp, 0);
- subscriptions.consumed(tp, 0);
+ subscriptions.seek(tp, 0);
// normal fetch
fetcher.initFetches(cluster);
@@ -121,8 +121,7 @@ public class FetcherTest {
public void testFetchDuringRebalance() {
subscriptions.subscribe(topicName);
subscriptions.changePartitionAssignment(Arrays.asList(tp));
- subscriptions.fetched(tp, 0);
- subscriptions.consumed(tp, 0);
+ subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
@@ -136,10 +135,32 @@ public class FetcherTest {
}
@Test
+ public void testInFlightFetchOnPausedPartition() {
+ subscriptions.subscribe(tp);
+ subscriptions.seek(tp, 0);
+
+ fetcher.initFetches(cluster);
+ subscriptions.pause(tp);
+
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
+ consumerClient.poll(0);
+ assertNull(fetcher.fetchedRecords().get(tp));
+ }
+
+ @Test
+ public void testFetchOnPausedPartition() {
+ subscriptions.subscribe(tp);
+ subscriptions.seek(tp, 0);
+
+ subscriptions.pause(tp);
+ fetcher.initFetches(cluster);
+ assertTrue(client.requests().isEmpty());
+ }
+
+ @Test
public void testFetchFailed() {
subscriptions.subscribe(tp);
- subscriptions.fetched(tp, 0);
- subscriptions.consumed(tp, 0);
+ subscriptions.seek(tp, 0);
// fetch with not leader
fetcher.initFetches(cluster);
@@ -169,8 +190,7 @@ public class FetcherTest {
@Test
public void testFetchOutOfRange() {
subscriptions.subscribe(tp);
- subscriptions.fetched(tp, 5);
- subscriptions.consumed(tp, 5);
+ subscriptions.seek(tp, 5);
// fetch with out of range
fetcher.initFetches(cluster);
http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index c47f3fb..1ba6f7a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static java.util.Arrays.asList;
@@ -37,12 +38,13 @@ public class SubscriptionStateTest {
state.subscribe(tp0);
assertEquals(Collections.singleton(tp0), state.assignedPartitions());
state.committed(tp0, 1);
- state.fetched(tp0, 1);
- state.consumed(tp0, 1);
+ state.seek(tp0, 1);
+ assertTrue(state.isFetchable(tp0));
assertAllPositions(tp0, 1L);
state.unsubscribe(tp0);
assertTrue(state.assignedPartitions().isEmpty());
- assertAllPositions(tp0, null);
+ assertFalse(state.isAssigned(tp0));
+ assertFalse(state.isFetchable(tp0));
}
@Test
@@ -52,10 +54,15 @@ public class SubscriptionStateTest {
assertEquals(5L, (long) state.fetched(tp0));
assertEquals(5L, (long) state.consumed(tp0));
state.needOffsetReset(tp0);
- assertTrue(state.isOffsetResetNeeded());
+ assertFalse(state.isFetchable(tp0));
assertTrue(state.isOffsetResetNeeded(tp0));
assertEquals(null, state.fetched(tp0));
assertEquals(null, state.consumed(tp0));
+
+ // seek should clear the reset and make the partition fetchable
+ state.seek(tp0, 0);
+ assertTrue(state.isFetchable(tp0));
+ assertFalse(state.isOffsetResetNeeded(tp0));
}
@Test
@@ -65,16 +72,28 @@ public class SubscriptionStateTest {
assertTrue(state.assignedPartitions().isEmpty());
assertTrue(state.partitionsAutoAssigned());
state.changePartitionAssignment(asList(tp0));
+ state.seek(tp0, 1);
state.committed(tp0, 1);
- state.fetched(tp0, 1);
- state.consumed(tp0, 1);
assertAllPositions(tp0, 1L);
state.changePartitionAssignment(asList(tp1));
- assertAllPositions(tp0, null);
+ assertTrue(state.isAssigned(tp1));
+ assertFalse(state.isAssigned(tp0));
+ assertFalse(state.isFetchable(tp1));
assertEquals(Collections.singleton(tp1), state.assignedPartitions());
}
@Test
+ public void partitionPause() {
+ state.subscribe(tp0);
+ state.seek(tp0, 100);
+ assertTrue(state.isFetchable(tp0));
+ state.pause(tp0);
+ assertFalse(state.isFetchable(tp0));
+ state.resume(tp0);
+ assertTrue(state.isFetchable(tp0));
+ }
+
+ @Test
public void topicUnsubscription() {
final String topic = "test";
state.subscribe(topic);
@@ -83,24 +102,37 @@ public class SubscriptionStateTest {
assertTrue(state.partitionsAutoAssigned());
state.changePartitionAssignment(asList(tp0));
state.committed(tp0, 1);
- state.fetched(tp0, 1);
- state.consumed(tp0, 1);
+ state.seek(tp0, 1);
assertAllPositions(tp0, 1L);
state.changePartitionAssignment(asList(tp1));
- assertAllPositions(tp0, null);
+ assertFalse(state.isAssigned(tp0));
assertEquals(Collections.singleton(tp1), state.assignedPartitions());
state.unsubscribe(topic);
assertEquals(0, state.subscribedTopics().size());
assertTrue(state.assignedPartitions().isEmpty());
}
-
- @Test(expected = IllegalArgumentException.class)
+
+ @Test(expected = IllegalStateException.class)
+ public void invalidConsumedPositionUpdate() {
+ state.subscribe("test");
+ state.changePartitionAssignment(asList(tp0));
+ state.consumed(tp0, 0);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void invalidFetchPositionUpdate() {
+ state.subscribe("test");
+ state.changePartitionAssignment(asList(tp0));
+ state.fetched(tp0, 0);
+ }
+
+ @Test(expected = IllegalStateException.class)
public void cantChangeFetchPositionForNonAssignedPartition() {
state.fetched(tp0, 1);
}
- @Test(expected = IllegalArgumentException.class)
+ @Test(expected = IllegalStateException.class)
public void cantChangeConsumedPositionForNonAssignedPartition() {
state.consumed(tp0, 1);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 0c2755f..4ea49f2 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -18,7 +18,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import kafka.utils.{TestUtils, Logging}
import kafka.server.KafkaConfig
@@ -254,6 +254,34 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
}
}
+ def testPartitionPauseAndResume() {
+ sendRecords(5)
+ this.consumers(0).subscribe(tp)
+ consumeRecords(this.consumers(0), 5, 0)
+ this.consumers(0).pause(tp)
+ sendRecords(5)
+ assertTrue(this.consumers(0).poll(0).isEmpty)
+ this.consumers(0).resume(tp)
+ consumeRecords(this.consumers(0), 5, 5)
+ }
+
+ def testPauseStateNotPreservedByRebalance() {
+ this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
+ val consumer0 = new KafkaConsumer(this.consumerConfig, null, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+ sendRecords(5)
+ consumer0.subscribe(topic)
+ consumeRecords(consumer0, 5, 0)
+ consumer0.pause(tp)
+
+ // subscribe to a new topic to trigger a rebalance
+ consumer0.subscribe("topic2")
+
+ // after rebalance, our position should be reset and our pause state lost,
+ // so we should be able to consume from the beginning
+ consumeRecords(consumer0, 0, 5)
+ }
+
private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback {
var callsToAssigned = 0
var callsToRevoked = 0
@@ -264,7 +292,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
info("onPartitionsRevoked called.")
callsToRevoked += 1
- }
+ }
}
private def sendRecords(numRecords: Int): Unit = {