You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/20 04:59:58 UTC
kafka git commit: KAFKA-3949: Fix race condition when metadata update
arrives during rebalance
Repository: kafka
Updated Branches:
refs/heads/trunk c5d26c482 -> 317c4fded
KAFKA-3949: Fix race condition when metadata update arrives during rebalance
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Vahid Hashemian, Guozhang Wang
Closes #1762 from hachikuji/KAFKA-3949
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/317c4fde
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/317c4fde
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/317c4fde
Branch: refs/heads/trunk
Commit: 317c4fdede41f2026b34f473af1ad69f8ee62a1d
Parents: c5d26c4
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Aug 19 21:59:55 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Aug 19 21:59:55 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/clients/Metadata.java | 13 +-
.../kafka/clients/consumer/KafkaConsumer.java | 10 +-
.../kafka/clients/consumer/MockConsumer.java | 8 +-
.../consumer/internals/AbstractCoordinator.java | 20 +-
.../consumer/internals/ConsumerCoordinator.java | 59 ++--
.../clients/consumer/internals/Fetcher.java | 28 +-
.../consumer/internals/SubscriptionState.java | 100 +++----
.../org/apache/kafka/clients/MockClient.java | 2 +-
.../internals/ConsumerCoordinatorTest.java | 280 +++++++++++--------
.../clients/consumer/internals/FetcherTest.java | 65 ++---
.../internals/SubscriptionStateTest.java | 77 +++--
11 files changed, 354 insertions(+), 308 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 0fd5d63..a4cf730 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -211,8 +211,15 @@ public final class Metadata {
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);
- // Do this after notifying listeners as subscribed topics' list can be changed by listeners
- this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;
+
+ if (this.needMetadataForAllTopics) {
+ // the listener may change the interested topics, which could cause another metadata refresh.
+ // If we have already fetched all topics, however, another fetch should be unnecessary.
+ this.needUpdate = false;
+ this.cluster = getClusterForCurrentTopics(cluster);
+ } else {
+ this.cluster = cluster;
+ }
notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
@@ -287,7 +294,7 @@ public final class Metadata {
Set<String> unauthorizedTopics = new HashSet<>();
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList();
- Set<String> internalTopics = Collections.<String>emptySet();
+ Set<String> internalTopics = Collections.emptySet();
if (cluster != null) {
internalTopics = cluster.internalTopics();
unauthorizedTopics.addAll(cluster.unauthorizedTopics());
http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ef91302..85d5194 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -801,7 +801,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
- this.subscriptions.subscribe(topics, listener);
+ this.subscriptions.subscribe(new HashSet<>(topics), listener);
metadata.setTopics(subscriptions.groupSubscription());
}
} finally {
@@ -914,7 +914,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
- this.subscriptions.assignFromUser(partitions);
+ this.subscriptions.assignFromUser(new HashSet<>(partitions));
metadata.setTopics(topics);
}
} finally {
@@ -1007,6 +1007,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
long now = time.milliseconds();
client.poll(Math.min(coordinator.timeToNextPoll(now), timeout), now);
+
+ // after the long poll, we should check whether the group needs to rebalance
+ // prior to returning data so that the group can stabilize faster
+ if (coordinator.needRejoin())
+ return Collections.emptyMap();
+
return fetcher.fetchedRecords();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 9ab4c29..62eb77d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -94,26 +94,26 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
public void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) {
ensureNotClosed();
this.subscriptions.subscribe(pattern, listener);
- List<String> topicsToSubscribe = new ArrayList<>();
+ Set<String> topicsToSubscribe = new HashSet<>();
for (String topic: partitions.keySet()) {
if (pattern.matcher(topic).matches() &&
!subscriptions.subscription().contains(topic))
topicsToSubscribe.add(topic);
}
ensureNotClosed();
- this.subscriptions.changeSubscription(topicsToSubscribe);
+ this.subscriptions.subscribeFromPattern(topicsToSubscribe);
}
@Override
public void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) {
ensureNotClosed();
- this.subscriptions.subscribe(topics, listener);
+ this.subscriptions.subscribe(new HashSet<>(topics), listener);
}
@Override
public void assign(Collection<TopicPartition> partitions) {
ensureNotClosed();
- this.subscriptions.assignFromUser(partitions);
+ this.subscriptions.assignFromUser(new HashSet<>(partitions));
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 690df26..bf6b920 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -270,16 +270,6 @@ public abstract class AbstractCoordinator implements Closeable {
// when sending heartbeats and does not necessarily require us to rejoin the group.
ensureCoordinatorReady();
- if (!needRejoin())
- return;
-
- // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
- // time if the client is woken up before a pending rebalance completes.
- if (needsJoinPrepare) {
- onJoinPrepare(generation.generationId, generation.memberId);
- needsJoinPrepare = false;
- }
-
if (heartbeatThread == null) {
heartbeatThread = new HeartbeatThread();
heartbeatThread.start();
@@ -288,6 +278,16 @@ public abstract class AbstractCoordinator implements Closeable {
while (needRejoin()) {
ensureCoordinatorReady();
+ // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
+ // time if the client is woken up before a pending rebalance completes. This must be called
+ // on each iteration of the loop because an event requiring a rebalance (such as a metadata
+ // refresh which changes the matched subscription set) can occur while another rebalance is
+ // still in progress.
+ if (needsJoinPrepare) {
+ onJoinPrepare(generation.generationId, generation.memberId);
+ needsJoinPrepare = false;
+ }
+
// ensure that there are no pending requests to the coordinator. This is important
// in particular to avoid resending a pending JoinGroup request.
if (client.pendingRequestCount(this.coordinator) > 0) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5fee45a..b8df50e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -78,6 +78,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// of offset commit requests, which may be invoked from the heartbeat thread
private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;
+ private boolean isLeader = false;
+ private Set<String> joinedSubscription;
private MetadataSnapshot metadataSnapshot;
private MetadataSnapshot assignmentSnapshot;
private long nextAutoCommitDeadline;
@@ -137,9 +139,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
@Override
public List<ProtocolMetadata> metadata() {
+ this.joinedSubscription = subscriptions.subscription();
List<ProtocolMetadata> metadataList = new ArrayList<>();
for (PartitionAssignor assignor : assignors) {
- Subscription subscription = assignor.subscription(subscriptions.subscription());
+ Subscription subscription = assignor.subscription(joinedSubscription);
ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
}
@@ -155,26 +158,26 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
if (subscriptions.hasPatternSubscription()) {
- final List<String> topicsToSubscribe = new ArrayList<>();
+ final Set<String> topicsToSubscribe = new HashSet<>();
for (String topic : cluster.topics())
if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
!(excludeInternalTopics && cluster.internalTopics().contains(topic)))
topicsToSubscribe.add(topic);
- subscriptions.changeSubscription(topicsToSubscribe);
+ subscriptions.subscribeFromPattern(topicsToSubscribe);
+
+ // note we still need to update the topics contained in the metadata. Although we have
+ // specified that all topics should be fetched, only those set explicitly will be retained
metadata.setTopics(subscriptions.groupSubscription());
}
// check if there are any changes to the metadata which should trigger a rebalance
if (subscriptions.partitionsAutoAssigned()) {
MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
- if (!snapshot.equals(metadataSnapshot)) {
+ if (!snapshot.equals(metadataSnapshot))
metadataSnapshot = snapshot;
- subscriptions.needReassignment();
- }
}
-
}
});
}
@@ -192,12 +195,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
- // if we were the assignor, then we need to make sure that there have been no metadata updates
- // since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
- if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
- subscriptions.needReassignment();
- return;
- }
+ // only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
+ if (!isLeader)
+ assignmentSnapshot = null;
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
@@ -246,13 +246,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
now = time.milliseconds();
}
- if (subscriptions.partitionsAutoAssigned() && needRejoin()) {
- // due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that
- // the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives
- // while the rebalance is still pending (for example, when the join group is still inflight), then we will lose
- // track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without
- // ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a
- // rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem.
+ if (needRejoin()) {
+ // due to a race condition between the initial metadata fetch and the initial rebalance,
+ // we need to ensure that the metadata is fresh before joining initially. This ensures
+ // that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription())
client.ensureFreshMetadata();
@@ -303,6 +300,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// update metadata (if needed) and keep track of the metadata used for assignment so that
// we can check after rebalance completion whether anything has changed
client.ensureFreshMetadata();
+
+ isLeader = true;
assignmentSnapshot = metadataSnapshot;
log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
@@ -339,14 +338,24 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
listener.getClass().getName(), groupId, e);
}
- assignmentSnapshot = null;
- subscriptions.needReassignment();
+ isLeader = false;
+ subscriptions.resetGroupSubscription();
}
@Override
- protected boolean needRejoin() {
- return subscriptions.partitionsAutoAssigned() &&
- (super.needRejoin() || subscriptions.partitionAssignmentNeeded());
+ public boolean needRejoin() {
+ if (!subscriptions.partitionsAutoAssigned())
+ return false;
+
+ // we need to rejoin if we performed the assignment and metadata has changed
+ if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
+ return true;
+
+ // we need to join if our subscription has changed since the last join
+ if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
+ return true;
+
+ return super.needRejoin();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 84278c6..aa5cdbe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -350,26 +350,22 @@ public class Fetcher<K, V> {
* the defaultResetPolicy is NONE
*/
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
- if (this.subscriptions.partitionAssignmentNeeded()) {
- return Collections.emptyMap();
- } else {
- Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
- int recordsRemaining = maxPollRecords;
+ Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
+ int recordsRemaining = maxPollRecords;
- while (recordsRemaining > 0) {
- if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
- CompletedFetch completedFetch = completedFetches.poll();
- if (completedFetch == null)
- break;
+ while (recordsRemaining > 0) {
+ if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
+ CompletedFetch completedFetch = completedFetches.poll();
+ if (completedFetch == null)
+ break;
- nextInLineRecords = parseFetchedData(completedFetch);
- } else {
- recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
- }
+ nextInLineRecords = parseFetchedData(completedFetch);
+ } else {
+ recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
}
-
- return drained;
}
+
+ return drained;
}
private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index e9b2eb2..6d4c01b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -46,6 +47,8 @@ import java.util.regex.Pattern;
* to set the initial fetch position (e.g. {@link Fetcher#resetOffset(TopicPartition)}.
*/
public class SubscriptionState {
+ private static final String SUBSCRIPTION_EXCEPTION_MESSAGE =
+ "Subscription to topics, partitions and pattern are mutually exclusive";
private enum SubscriptionType {
NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
@@ -58,20 +61,17 @@ public class SubscriptionState {
private Pattern subscribedPattern;
/* the list of topics the user has requested */
- private final Set<String> subscription;
+ private Set<String> subscription;
+
+ /* the list of partitions the user has requested */
+ private Set<TopicPartition> userAssignment;
/* the list of topics the group has subscribed to (set only for the leader on join group completion) */
private final Set<String> groupSubscription;
- /* the list of partitions the user has requested */
- private final Set<TopicPartition> userAssignment;
-
/* the list of partitions currently assigned */
private final Map<TopicPartition, TopicPartitionState> assignment;
- /* do we need to request a partition assignment from the coordinator? */
- private boolean needsPartitionAssignment;
-
/* do we need to request the latest committed offsets from the coordinator? */
private boolean needsFetchCommittedOffsets;
@@ -81,8 +81,16 @@ public class SubscriptionState {
/* Listener to be invoked when assignment changes */
private ConsumerRebalanceListener listener;
- private static final String SUBSCRIPTION_EXCEPTION_MESSAGE =
- "Subscription to topics, partitions and pattern are mutually exclusive";
+ public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
+ this.defaultResetStrategy = defaultResetStrategy;
+ this.subscription = Collections.emptySet();
+ this.userAssignment = Collections.emptySet();
+ this.assignment = new HashMap<>();
+ this.groupSubscription = new HashSet<>();
+ this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
+ this.subscribedPattern = null;
+ this.subscriptionType = SubscriptionType.NONE;
+ }
/**
* This method sets the subscription type if it is not already set (i.e. when it is NONE),
@@ -97,19 +105,7 @@ public class SubscriptionState {
throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
}
- public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
- this.defaultResetStrategy = defaultResetStrategy;
- this.subscription = new HashSet<>();
- this.userAssignment = new HashSet<>();
- this.assignment = new HashMap<>();
- this.groupSubscription = new HashSet<>();
- this.needsPartitionAssignment = false;
- this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
- this.subscribedPattern = null;
- this.subscriptionType = SubscriptionType.NONE;
- }
-
- public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
+ public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
if (listener == null)
throw new IllegalArgumentException("RebalanceListener cannot be null");
@@ -120,12 +116,18 @@ public class SubscriptionState {
changeSubscription(topics);
}
- public void changeSubscription(Collection<String> topicsToSubscribe) {
- if (!this.subscription.equals(new HashSet<>(topicsToSubscribe))) {
- this.subscription.clear();
- this.subscription.addAll(topicsToSubscribe);
+ public void subscribeFromPattern(Set<String> topics) {
+ if (subscriptionType != SubscriptionType.AUTO_PATTERN)
+ throw new IllegalArgumentException("Attempt to subscribe from pattern while subscription type set to " +
+ subscriptionType);
+
+ changeSubscription(topics);
+ }
+
+ private void changeSubscription(Set<String> topicsToSubscribe) {
+ if (!this.subscription.equals(topicsToSubscribe)) {
+ this.subscription = topicsToSubscribe;
this.groupSubscription.addAll(topicsToSubscribe);
- this.needsPartitionAssignment = true;
// Remove any assigned partitions which are no longer subscribed to
for (Iterator<TopicPartition> it = assignment.keySet().iterator(); it.hasNext(); ) {
@@ -147,9 +149,11 @@ public class SubscriptionState {
this.groupSubscription.addAll(topics);
}
- public void needReassignment() {
+ /**
+ * Reset the group's subscription to only contain topics subscribed by this consumer.
+ */
+ public void resetGroupSubscription() {
this.groupSubscription.retainAll(subscription);
- this.needsPartitionAssignment = true;
}
/**
@@ -157,34 +161,37 @@ public class SubscriptionState {
* note this is different from {@link #assignFromSubscribed(Collection)}
* whose input partitions are provided from the subscribed topics.
*/
- public void assignFromUser(Collection<TopicPartition> partitions) {
+ public void assignFromUser(Set<TopicPartition> partitions) {
setSubscriptionType(SubscriptionType.USER_ASSIGNED);
- this.userAssignment.clear();
- this.userAssignment.addAll(partitions);
+ if (!this.assignment.keySet().equals(partitions)) {
+ this.userAssignment = partitions;
- for (TopicPartition partition : partitions)
- if (!assignment.containsKey(partition))
- addAssignedPartition(partition);
-
- this.assignment.keySet().retainAll(this.userAssignment);
-
- this.needsPartitionAssignment = false;
- this.needsFetchCommittedOffsets = true;
+ for (TopicPartition partition : partitions)
+ if (!assignment.containsKey(partition))
+ addAssignedPartition(partition);
+ this.assignment.keySet().retainAll(this.userAssignment);
+ this.needsFetchCommittedOffsets = true;
+ }
}
/**
* Change the assignment to the specified partitions returned from the coordinator,
- * note this is different from {@link #assignFromUser(Collection)} which directly set the assignment from user inputs
+ * note this is different from {@link #assignFromUser(Set)} which directly set the assignment from user inputs
*/
public void assignFromSubscribed(Collection<TopicPartition> assignments) {
+ if (!this.partitionsAutoAssigned())
+ throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use");
+
for (TopicPartition tp : assignments)
if (!this.subscription.contains(tp.topic()))
throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
+
+ // after rebalancing, we always reinitialize the assignment state
this.assignment.clear();
for (TopicPartition tp: assignments)
addAssignedPartition(tp);
- this.needsPartitionAssignment = false;
+ this.needsFetchCommittedOffsets = true;
}
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
@@ -202,10 +209,9 @@ public class SubscriptionState {
}
public void unsubscribe() {
- this.subscription.clear();
- this.userAssignment.clear();
+ this.subscription = Collections.emptySet();
+ this.userAssignment = Collections.emptySet();
this.assignment.clear();
- this.needsPartitionAssignment = true;
this.subscribedPattern = null;
this.subscriptionType = SubscriptionType.NONE;
}
@@ -346,10 +352,6 @@ public class SubscriptionState {
return missing;
}
- public boolean partitionAssignmentNeeded() {
- return this.needsPartitionAssignment;
- }
-
public boolean isAssigned(TopicPartition tp) {
return assignment.containsKey(tp);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 9fbbb88..8881f82 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -147,7 +147,7 @@ public class MockClient implements KafkaClient {
@Override
public List<ClientResponse> poll(long timeoutMs, long now) {
- List<ClientResponse> copy = new ArrayList<ClientResponse>(this.responses);
+ List<ClientResponse> copy = new ArrayList<>(this.responses);
while (!this.responses.isEmpty()) {
ClientResponse response = this.responses.poll();
http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8ec8b75..0486e6c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -66,9 +66,13 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -148,7 +152,7 @@ public class ConsumerCoordinatorTest {
@Test(expected = GroupAuthorizationException.class)
public void testGroupReadUnauthorized() {
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
@@ -206,7 +210,7 @@ public class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady();
// illegal_generation will cause re-partition
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
subscriptions.assignFromSubscribed(Collections.singletonList(tp));
time.sleep(sessionTimeoutMs);
@@ -230,7 +234,7 @@ public class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady();
// illegal_generation will cause re-partition
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
subscriptions.assignFromSubscribed(Collections.singletonList(tp));
time.sleep(sessionTimeoutMs);
@@ -273,8 +277,7 @@ public class ConsumerCoordinatorTest {
public void testJoinGroupInvalidGroupId() {
final String consumerId = "leader";
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
// ensure metadata is up-to-date for leader
metadata.setTopics(Arrays.asList(topicName));
@@ -292,8 +295,7 @@ public class ConsumerCoordinatorTest {
public void testNormalJoinGroupLeader() {
final String consumerId = "leader";
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
// ensure metadata is up-to-date for leader
metadata.setTopics(Arrays.asList(topicName));
@@ -304,7 +306,7 @@ public class ConsumerCoordinatorTest {
// normal join group
Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
- partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
+ partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(tp)));
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
client.prepareResponse(new MockClient.RequestMatcher() {
@@ -315,23 +317,86 @@ public class ConsumerCoordinatorTest {
sync.generationId() == 1 &&
sync.groupAssignment().containsKey(consumerId);
}
- }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ }, syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
- assertFalse(subscriptions.partitionAssignmentNeeded());
- assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertFalse(coordinator.needRejoin());
+ assertEquals(singleton(tp), subscriptions.assignedPartitions());
assertEquals(1, rebalanceListener.revokedCount);
assertEquals(Collections.emptySet(), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
- assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+ assertEquals(singleton(tp), rebalanceListener.assigned);
+ }
+
+ @Test
+ public void testMetadataRefreshDuringRebalance() {
+ final String consumerId = "leader";
+ final String otherTopicName = "otherTopic";
+ TopicPartition otherPartition = new TopicPartition(otherTopicName, 0);
+
+ subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
+ metadata.needMetadataForAllTopics(true);
+ metadata.update(cluster, time.milliseconds());
+
+ assertEquals(singleton(topicName), subscriptions.subscription());
+
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorReady();
+
+ Map<String, List<String>> initialSubscription = singletonMap(consumerId, Arrays.asList(topicName));
+ partitionAssignor.prepare(singletonMap(consumerId, singletonList(tp)));
+
+ // the metadata will be updated in flight with a new topic added
+ final List<String> updatedSubscription = Arrays.asList(topicName, otherTopicName);
+ final Set<String> updatedSubscriptionSet = new HashSet<>(updatedSubscription);
+
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE.code()));
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ final Map<String, Integer> updatedPartitions = new HashMap<>();
+ for (String topic : updatedSubscription)
+ updatedPartitions.put(topic, 1);
+ metadata.update(TestUtils.clusterWith(1, updatedPartitions), time.milliseconds());
+ return true;
+ }
+ }, syncGroupResponse(singletonList(tp), Errors.NONE.code()));
+
+ List<TopicPartition> newAssignment = Arrays.asList(tp, otherPartition);
+ Set<TopicPartition> newAssignmentSet = new HashSet<>(newAssignment);
+
+ Map<String, List<String>> updatedSubscriptions = singletonMap(consumerId, Arrays.asList(topicName, otherTopicName));
+ partitionAssignor.prepare(singletonMap(consumerId, newAssignment));
+
+ // we expect to see a second rebalance with the new-found topics
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ JoinGroupRequest join = new JoinGroupRequest(request.request().body());
+ ProtocolMetadata protocolMetadata = join.groupProtocols().iterator().next();
+ PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata.metadata());
+ protocolMetadata.metadata().rewind();
+ return subscription.topics().containsAll(updatedSubscriptionSet);
+ }
+ }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE.code()));
+
+ coordinator.poll(time.milliseconds());
+
+ assertFalse(coordinator.needRejoin());
+ assertEquals(updatedSubscriptionSet, subscriptions.subscription());
+ assertEquals(newAssignmentSet, subscriptions.assignedPartitions());
+ assertEquals(2, rebalanceListener.revokedCount);
+ assertEquals(singleton(tp), rebalanceListener.revoked);
+ assertEquals(2, rebalanceListener.assignedCount);
+ assertEquals(newAssignmentSet, rebalanceListener.assigned);
}
@Test
public void testWakeupDuringJoin() {
final String consumerId = "leader";
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
// ensure metadata is up-to-date for leader
metadata.setTopics(Arrays.asList(topicName));
@@ -341,7 +406,7 @@ public class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady();
Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
- partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
+ partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(tp)));
// prepare only the first half of the join and then trigger the wakeup
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
@@ -354,23 +419,22 @@ public class ConsumerCoordinatorTest {
}
// now complete the second half
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
- assertFalse(subscriptions.partitionAssignmentNeeded());
- assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertFalse(coordinator.needRejoin());
+ assertEquals(singleton(tp), subscriptions.assignedPartitions());
assertEquals(1, rebalanceListener.revokedCount);
assertEquals(Collections.emptySet(), rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
- assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+ assertEquals(singleton(tp), rebalanceListener.assigned);
}
@Test
public void testNormalJoinGroupFollower() {
final String consumerId = "consumer";
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
@@ -385,29 +449,28 @@ public class ConsumerCoordinatorTest {
sync.generationId() == 1 &&
sync.groupAssignment().isEmpty();
}
- }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ }, syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
- assertFalse(subscriptions.partitionAssignmentNeeded());
- assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertFalse(coordinator.needRejoin());
+ assertEquals(singleton(tp), subscriptions.assignedPartitions());
assertEquals(1, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
- assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+ assertEquals(singleton(tp), rebalanceListener.assigned);
}
@Test
public void testLeaveGroupOnClose() {
final String consumerId = "consumer";
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
final AtomicBoolean received = new AtomicBoolean(false);
@@ -428,14 +491,13 @@ public class ConsumerCoordinatorTest {
public void testMaybeLeaveGroup() {
final String consumerId = "consumer";
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
final AtomicBoolean received = new AtomicBoolean(false);
@@ -459,8 +521,7 @@ public class ConsumerCoordinatorTest {
public void testUnexpectedErrorOnSyncGroup() {
final String consumerId = "consumer";
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
@@ -475,8 +536,7 @@ public class ConsumerCoordinatorTest {
public void testUnknownMemberIdOnSyncGroup() {
final String consumerId = "consumer";
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
@@ -493,20 +553,19 @@ public class ConsumerCoordinatorTest {
return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
}
}, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
- assertFalse(subscriptions.partitionAssignmentNeeded());
- assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertFalse(coordinator.needRejoin());
+ assertEquals(singleton(tp), subscriptions.assignedPartitions());
}
@Test
public void testRebalanceInProgressOnSyncGroup() {
final String consumerId = "consumer";
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
@@ -517,20 +576,19 @@ public class ConsumerCoordinatorTest {
// then let the full join/sync finish successfully
client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
- assertFalse(subscriptions.partitionAssignmentNeeded());
- assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertFalse(coordinator.needRejoin());
+ assertEquals(singleton(tp), subscriptions.assignedPartitions());
}
@Test
public void testIllegalGenerationOnSyncGroup() {
final String consumerId = "consumer";
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
@@ -547,39 +605,45 @@ public class ConsumerCoordinatorTest {
return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
}
}, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
- assertFalse(subscriptions.partitionAssignmentNeeded());
- assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertFalse(coordinator.needRejoin());
+ assertEquals(singleton(tp), subscriptions.assignedPartitions());
}
@Test
public void testMetadataChangeTriggersRebalance() {
final String consumerId = "consumer";
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ // ensure metadata is up-to-date for leader
+ metadata.setTopics(Arrays.asList(topicName));
+ metadata.update(cluster, time.milliseconds());
+
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
- client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
+ partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(tp)));
+
+ // the leader is responsible for picking up metadata changes and forcing a group rebalance
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
- assertFalse(subscriptions.partitionAssignmentNeeded());
+ assertFalse(coordinator.needRejoin());
// a new partition is added to the topic
metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
// we should detect the change and ask for reassignment
- assertTrue(subscriptions.partitionAssignmentNeeded());
+ assertTrue(coordinator.needRejoin());
}
-
@Test
public void testUpdateMetadataDuringRebalance() {
final String topic1 = "topic1";
@@ -590,9 +654,8 @@ public class ConsumerCoordinatorTest {
List<String> topics = Arrays.asList(topic1, topic2);
- subscriptions.subscribe(topics, rebalanceListener);
+ subscriptions.subscribe(new HashSet<>(topics), rebalanceListener);
metadata.setTopics(topics);
- subscriptions.needReassignment();
// we only have metadata for one topic initially
metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
@@ -629,7 +692,7 @@ public class ConsumerCoordinatorTest {
coordinator.poll(time.milliseconds());
- assertFalse(subscriptions.partitionAssignmentNeeded());
+ assertFalse(coordinator.needRejoin());
assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
}
@@ -640,7 +703,7 @@ public class ConsumerCoordinatorTest {
metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
- assertFalse(subscriptions.partitionAssignmentNeeded());
+ assertFalse(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
}
@Test
@@ -650,41 +713,43 @@ public class ConsumerCoordinatorTest {
metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
- assertTrue(subscriptions.partitionAssignmentNeeded());
+ assertTrue(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
}
@Test
public void testRejoinGroup() {
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ String otherTopic = "otherTopic";
+
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
// join the group once
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
assertEquals(1, rebalanceListener.revokedCount);
+ assertTrue(rebalanceListener.revoked.isEmpty());
assertEquals(1, rebalanceListener.assignedCount);
+ assertEquals(singleton(tp), rebalanceListener.assigned);
// and join the group again
- subscriptions.needReassignment();
+ subscriptions.subscribe(new HashSet<>(Arrays.asList(topicName, otherTopic)), rebalanceListener);
client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code()));
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
assertEquals(2, rebalanceListener.revokedCount);
- assertEquals(Collections.singleton(tp), rebalanceListener.revoked);
+ assertEquals(singleton(tp), rebalanceListener.revoked);
assertEquals(2, rebalanceListener.assignedCount);
- assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+ assertEquals(singleton(tp), rebalanceListener.assigned);
}
@Test
public void testDisconnectInJoin() {
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
@@ -693,19 +758,18 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()), true);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
- assertFalse(subscriptions.partitionAssignmentNeeded());
- assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertFalse(coordinator.needRejoin());
+ assertEquals(singleton(tp), subscriptions.assignedPartitions());
assertEquals(1, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
- assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+ assertEquals(singleton(tp), rebalanceListener.assigned);
}
@Test(expected = ApiException.class)
public void testInvalidSessionTimeout() {
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
@@ -717,7 +781,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCommitOffsetOnly() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
@@ -739,14 +803,13 @@ public class ConsumerCoordinatorTest {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
subscriptions.seek(tp, 100);
@@ -765,8 +828,7 @@ public class ConsumerCoordinatorTest {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.needReassignment();
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
@@ -776,7 +838,7 @@ public class ConsumerCoordinatorTest {
consumerClient.poll(0);
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
subscriptions.seek(tp, 100);
@@ -793,7 +855,7 @@ public class ConsumerCoordinatorTest {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 100);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
@@ -811,7 +873,7 @@ public class ConsumerCoordinatorTest {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 100);
// no commit initially since coordinator is unknown
@@ -835,7 +897,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCommitOffsetMetadata() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
@@ -866,20 +928,20 @@ public class ConsumerCoordinatorTest {
@Test
public void testCommitAfterLeaveGroup() {
// enable auto-assignment
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.subscribe(singleton(topicName), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
coordinator.poll(time.milliseconds());
// now switch to manual assignment
client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct());
subscriptions.unsubscribe();
coordinator.maybeLeaveGroup();
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
// the client should not reuse generation/memberId from auto-subscribed generation
client.prepareResponse(new MockClient.RequestMatcher() {
@@ -1048,7 +1110,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
coordinator.refreshCommittedOffsetsIfNeeded();
@@ -1061,7 +1123,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.GROUP_LOAD_IN_PROGRESS.code(), "", 100L));
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
@@ -1075,7 +1137,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
@@ -1090,7 +1152,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
coordinator.refreshCommittedOffsetsIfNeeded();
@@ -1122,37 +1184,6 @@ public class ConsumerCoordinatorTest {
}
}
- @Test
- public void testMetadataTopicsExpiryDisabled() {
- final String consumerId = "consumer";
-
- subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- HashSet<String> topics = new HashSet<>();
- topics.add(topicName);
- metadata.setTopics(topics);
- subscriptions.needReassignment();
-
- client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorReady();
-
- client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
- client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
- coordinator.poll(time.milliseconds());
-
- metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
- assertTrue("Topic not found in metadata", metadata.containsTopic(topicName));
- time.sleep(Metadata.TOPIC_EXPIRY_MS * 2);
- metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
- assertTrue("Topic expired", metadata.containsTopic(topicName));
- metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
- metadata.update(Cluster.empty(), time.milliseconds());
- assertTrue("Topic expired", metadata.containsTopic(topicName));
-
- assertTrue(subscriptions.partitionAssignmentNeeded());
- metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
- assertTrue(subscriptions.partitionAssignmentNeeded());
- }
-
private ConsumerCoordinator buildCoordinator(Metrics metrics,
List<PartitionAssignor> assignors,
boolean excludeInternalTopics,
@@ -1187,7 +1218,8 @@ public class ConsumerCoordinatorTest {
return response.toStruct();
}
- private Struct joinGroupLeaderResponse(int generationId, String memberId,
+ private Struct joinGroupLeaderResponse(int generationId,
+ String memberId,
Map<String, List<String>> subscriptions,
short error) {
Map<String, ByteBuffer> metadata = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5186618..5c0b49c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -66,6 +66,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
+import static java.util.Collections.singleton;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -123,7 +124,7 @@ public class FetcherTest {
@Test
public void testFetchNormal() {
List<ConsumerRecord<byte[], byte[]>> records;
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 0);
// normal fetch
@@ -167,7 +168,7 @@ public class FetcherTest {
Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer);
- subscriptions.assignFromUser(Collections.singleton(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 1);
client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
@@ -209,7 +210,7 @@ public class FetcherTest {
compressor.close();
buffer.flip();
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 0);
// normal fetch
@@ -230,7 +231,7 @@ public class FetcherTest {
Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2);
List<ConsumerRecord<byte[], byte[]>> records;
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 1);
client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
@@ -272,7 +273,7 @@ public class FetcherTest {
records.close();
List<ConsumerRecord<byte[], byte[]>> consumerRecords;
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 0);
// normal fetch
@@ -290,7 +291,7 @@ public class FetcherTest {
@Test(expected = RecordTooLargeException.class)
public void testFetchRecordTooLarge() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 0);
// prepare large record
@@ -309,7 +310,7 @@ public class FetcherTest {
@Test
public void testUnauthorizedTopic() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 0);
// resize the limit of the buffer to pretend it is only fetch-size large
@@ -320,20 +321,20 @@ public class FetcherTest {
fetcher.fetchedRecords();
fail("fetchedRecords should have thrown");
} catch (TopicAuthorizationException e) {
- assertEquals(Collections.singleton(topicName), e.unauthorizedTopics());
+ assertEquals(singleton(topicName), e.unauthorizedTopics());
}
}
@Test
public void testFetchDuringRebalance() {
- subscriptions.subscribe(Arrays.asList(topicName), listener);
- subscriptions.assignFromSubscribed(Arrays.asList(tp));
+ subscriptions.subscribe(singleton(topicName), listener);
+ subscriptions.assignFromSubscribed(singleton(tp));
subscriptions.seek(tp, 0);
fetcher.sendFetches();
// Now the rebalance happens and fetch positions are cleared
- subscriptions.assignFromSubscribed(Arrays.asList(tp));
+ subscriptions.assignFromSubscribed(singleton(tp));
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
@@ -343,7 +344,7 @@ public class FetcherTest {
@Test
public void testInFlightFetchOnPausedPartition() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 0);
fetcher.sendFetches();
@@ -356,7 +357,7 @@ public class FetcherTest {
@Test
public void testFetchOnPausedPartition() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 0);
subscriptions.pause(tp);
@@ -366,7 +367,7 @@ public class FetcherTest {
@Test
public void testFetchNotLeaderForPartition() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 0);
fetcher.sendFetches();
@@ -378,7 +379,7 @@ public class FetcherTest {
@Test
public void testFetchUnknownTopicOrPartition() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 0);
fetcher.sendFetches();
@@ -390,7 +391,7 @@ public class FetcherTest {
@Test
public void testFetchOffsetOutOfRange() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 0);
fetcher.sendFetches();
@@ -405,7 +406,7 @@ public class FetcherTest {
public void testStaleOutOfRangeError() {
// verify that an out of range error which arrives after a seek
// does not cause us to reset our position or throw an exception
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 0);
fetcher.sendFetches();
@@ -419,7 +420,7 @@ public class FetcherTest {
@Test
public void testFetchedRecordsAfterSeek() {
- subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
+ subscriptionsNoAutoReset.assignFromUser(singleton(tp));
subscriptionsNoAutoReset.seek(tp, 0);
fetcherNoAutoReset.sendFetches();
@@ -432,7 +433,7 @@ public class FetcherTest {
@Test
public void testFetchOffsetOutOfRangeException() {
- subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
+ subscriptionsNoAutoReset.assignFromUser(singleton(tp));
subscriptionsNoAutoReset.seek(tp, 0);
fetcherNoAutoReset.sendFetches();
@@ -452,7 +453,7 @@ public class FetcherTest {
@Test
public void testFetchDisconnected() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 0);
fetcher.sendFetches();
@@ -470,22 +471,22 @@ public class FetcherTest {
public void testUpdateFetchPositionToCommitted() {
// unless a specific reset is expected, the default behavior is to reset to the committed
// position if one is present
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.committed(tp, new OffsetAndMetadata(5));
- fetcher.updateFetchPositions(Collections.singleton(tp));
+ fetcher.updateFetchPositions(singleton(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, subscriptions.position(tp).longValue());
}
@Test
public void testUpdateFetchPositionResetToDefaultOffset() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
// with no commit position, we should reset using the default strategy defined above (EARLIEST)
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
- fetcher.updateFetchPositions(Collections.singleton(tp));
+ fetcher.updateFetchPositions(singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, subscriptions.position(tp).longValue());
@@ -493,12 +494,12 @@ public class FetcherTest {
@Test
public void testUpdateFetchPositionResetToLatestOffset() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
- fetcher.updateFetchPositions(Collections.singleton(tp));
+ fetcher.updateFetchPositions(singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, subscriptions.position(tp).longValue());
@@ -506,12 +507,12 @@ public class FetcherTest {
@Test
public void testUpdateFetchPositionResetToEarliestOffset() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
- fetcher.updateFetchPositions(Collections.singleton(tp));
+ fetcher.updateFetchPositions(singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, subscriptions.position(tp).longValue());
@@ -519,7 +520,7 @@ public class FetcherTest {
@Test
public void testUpdateFetchPositionDisconnect() {
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
// First request gets a disconnect
@@ -529,7 +530,7 @@ public class FetcherTest {
// Next one succeeds
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
- fetcher.updateFetchPositions(Collections.singleton(tp));
+ fetcher.updateFetchPositions(singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, subscriptions.position(tp).longValue());
@@ -567,7 +568,7 @@ public class FetcherTest {
fetcher.getAllTopicMetadata(10L);
fail();
} catch (TopicAuthorizationException e) {
- assertEquals(Collections.singleton(topicName), e.unauthorizedTopics());
+ assertEquals(singleton(topicName), e.unauthorizedTopics());
}
}
@@ -600,7 +601,7 @@ public class FetcherTest {
@Test
public void testQuotaMetrics() throws Exception {
List<ConsumerRecord<byte[], byte[]>> records;
- subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 0);
// normal fetch
http://git-wip-us.apache.org/repos/asf/kafka/blob/317c4fde/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 3b4b10e..783f0e6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -16,21 +16,23 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static java.util.Arrays.asList;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Test;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singleton;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class SubscriptionStateTest {
@@ -43,16 +45,15 @@ public class SubscriptionStateTest {
@Test
public void partitionAssignment() {
- state.assignFromUser(Arrays.asList(tp0));
- assertEquals(Collections.singleton(tp0), state.assignedPartitions());
- assertFalse(state.partitionAssignmentNeeded());
+ state.assignFromUser(singleton(tp0));
+ assertEquals(singleton(tp0), state.assignedPartitions());
assertFalse(state.hasAllFetchPositions());
assertTrue(state.refreshCommitsNeeded());
state.committed(tp0, new OffsetAndMetadata(1));
state.seek(tp0, 1);
assertTrue(state.isFetchable(tp0));
assertAllPositions(tp0, 1L);
- state.assignFromUser(Arrays.<TopicPartition>asList());
+ state.assignFromUser(Collections.<TopicPartition>emptySet());
assertTrue(state.assignedPartitions().isEmpty());
assertFalse(state.isAssigned(tp0));
assertFalse(state.isFetchable(tp0));
@@ -60,7 +61,7 @@ public class SubscriptionStateTest {
@Test
public void partitionReset() {
- state.assignFromUser(Arrays.asList(tp0));
+ state.assignFromUser(singleton(tp0));
state.seek(tp0, 5);
assertEquals(5L, (long) state.position(tp0));
state.needOffsetReset(tp0);
@@ -76,9 +77,8 @@ public class SubscriptionStateTest {
@Test
public void topicSubscription() {
- state.subscribe(Arrays.asList(topic), rebalanceListener);
+ state.subscribe(singleton(topic), rebalanceListener);
assertEquals(1, state.subscription().size());
- assertTrue(state.partitionAssignmentNeeded());
assertTrue(state.assignedPartitions().isEmpty());
assertTrue(state.partitionsAutoAssigned());
state.assignFromSubscribed(asList(tp0));
@@ -87,15 +87,14 @@ public class SubscriptionStateTest {
assertAllPositions(tp0, 1L);
state.assignFromSubscribed(asList(tp1));
assertTrue(state.isAssigned(tp1));
- assertFalse(state.partitionAssignmentNeeded());
assertFalse(state.isAssigned(tp0));
assertFalse(state.isFetchable(tp1));
- assertEquals(Collections.singleton(tp1), state.assignedPartitions());
+ assertEquals(singleton(tp1), state.assignedPartitions());
}
@Test
public void partitionPause() {
- state.assignFromUser(Arrays.asList(tp0));
+ state.assignFromUser(singleton(tp0));
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
state.pause(tp0);
@@ -106,7 +105,7 @@ public class SubscriptionStateTest {
@Test
public void commitOffsetMetadata() {
- state.assignFromUser(Arrays.asList(tp0));
+ state.assignFromUser(singleton(tp0));
state.committed(tp0, new OffsetAndMetadata(5, "hi"));
assertEquals(5, state.committed(tp0).offset());
@@ -115,7 +114,7 @@ public class SubscriptionStateTest {
@Test(expected = IllegalStateException.class)
public void invalidPositionUpdate() {
- state.subscribe(Arrays.asList(topic), rebalanceListener);
+ state.subscribe(singleton(topic), rebalanceListener);
state.assignFromSubscribed(asList(tp0));
state.position(tp0, 0);
}
@@ -132,32 +131,32 @@ public class SubscriptionStateTest {
@Test(expected = IllegalStateException.class)
public void cantSubscribeTopicAndPattern() {
- state.subscribe(Arrays.asList(topic), rebalanceListener);
+ state.subscribe(singleton(topic), rebalanceListener);
state.subscribe(Pattern.compile(".*"), rebalanceListener);
}
@Test(expected = IllegalStateException.class)
public void cantSubscribePartitionAndPattern() {
- state.assignFromUser(Arrays.asList(tp0));
+ state.assignFromUser(singleton(tp0));
state.subscribe(Pattern.compile(".*"), rebalanceListener);
}
@Test(expected = IllegalStateException.class)
public void cantSubscribePatternAndTopic() {
state.subscribe(Pattern.compile(".*"), rebalanceListener);
- state.subscribe(Arrays.asList(topic), rebalanceListener);
+ state.subscribe(singleton(topic), rebalanceListener);
}
@Test(expected = IllegalStateException.class)
public void cantSubscribePatternAndPartition() {
state.subscribe(Pattern.compile(".*"), rebalanceListener);
- state.assignFromUser(Arrays.asList(tp0));
+ state.assignFromUser(singleton(tp0));
}
@Test
public void patternSubscription() {
state.subscribe(Pattern.compile(".*"), rebalanceListener);
- state.changeSubscription(Arrays.asList(topic, topic1));
+ state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, topic1)));
assertEquals(
"Expected subscribed topics count is incorrect", 2, state.subscription().size());
@@ -165,43 +164,37 @@ public class SubscriptionStateTest {
@Test
public void unsubscribeUserAssignment() {
- state.assignFromUser(Arrays.asList(tp0, tp1));
+ state.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
state.unsubscribe();
- state.subscribe(Arrays.asList(topic), rebalanceListener);
- assertEquals(Collections.singleton(topic), state.subscription());
+ state.subscribe(singleton(topic), rebalanceListener);
+ assertEquals(singleton(topic), state.subscription());
}
@Test
public void unsubscribeUserSubscribe() {
- state.subscribe(Arrays.asList(topic), rebalanceListener);
+ state.subscribe(singleton(topic), rebalanceListener);
state.unsubscribe();
- state.assignFromUser(Arrays.asList(tp0));
- assertEquals(Collections.singleton(tp0), state.assignedPartitions());
+ state.assignFromUser(singleton(tp0));
+ assertEquals(singleton(tp0), state.assignedPartitions());
}
@Test
public void unsubscription() {
state.subscribe(Pattern.compile(".*"), rebalanceListener);
- state.changeSubscription(Arrays.asList(topic, topic1));
- assertTrue(state.partitionAssignmentNeeded());
-
+ state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, topic1)));
state.assignFromSubscribed(asList(tp1));
- assertEquals(Collections.singleton(tp1), state.assignedPartitions());
- assertFalse(state.partitionAssignmentNeeded());
+ assertEquals(singleton(tp1), state.assignedPartitions());
state.unsubscribe();
assertEquals(0, state.subscription().size());
assertTrue(state.assignedPartitions().isEmpty());
- assertTrue(state.partitionAssignmentNeeded());
- state.assignFromUser(Arrays.asList(tp0));
- assertEquals(Collections.singleton(tp0), state.assignedPartitions());
- assertFalse(state.partitionAssignmentNeeded());
+ state.assignFromUser(singleton(tp0));
+ assertEquals(singleton(tp0), state.assignedPartitions());
state.unsubscribe();
assertEquals(0, state.subscription().size());
assertTrue(state.assignedPartitions().isEmpty());
- assertTrue(state.partitionAssignmentNeeded());
}
private static class MockRebalanceListener implements ConsumerRebalanceListener {