You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/23 06:01:07 UTC
kafka git commit: KAFKA-2686: Reset needsPartitionAssignment in
SubscriptionState.assign()
Repository: kafka
Updated Branches:
refs/heads/trunk bf292a6fa -> aa56dfb9e
KAFKA-2686: Reset needsPartitionAssignment in SubscriptionState.assign()
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Jason Gustafson, Jun Rao
Closes #352 from guozhangwang/K2686
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa56dfb9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa56dfb9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa56dfb9
Branch: refs/heads/trunk
Commit: aa56dfb9e7cea19faa545a13d42d499a6958cbef
Parents: bf292a6
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Oct 22 21:06:10 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Oct 22 21:06:10 2015 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 20 ++++-
.../kafka/clients/consumer/MockConsumer.java | 4 +-
.../consumer/internals/ConsumerCoordinator.java | 2 +-
.../consumer/internals/SubscriptionState.java | 38 +++++----
.../clients/consumer/KafkaConsumerTest.java | 32 ++++++++
.../internals/ConsumerCoordinatorTest.java | 18 ++---
.../clients/consumer/internals/FetcherTest.java | 36 ++++-----
.../internals/SubscriptionStateTest.java | 81 ++++++++++----------
.../clients/producer/KafkaProducerTest.java | 1 -
9 files changed, 142 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index cd166f0..06a9239 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -629,6 +629,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* assign partitions. Topic subscriptions are not incremental. This list will replace the current
* assignment (if there is one). Note that it is not possible to combine topic subscription with group management
* with manual partition assignment through {@link #assign(List)}.
+ *
+ * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
+ *
* <p>
* As part of group management, the consumer will keep track of the list of consumers that belong to a particular
* group and will trigger a rebalance operation if one of the following events trigger -
@@ -653,9 +656,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public void subscribe(List<String> topics, ConsumerRebalanceListener listener) {
acquire();
try {
- log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
- this.subscriptions.subscribe(topics, listener);
- metadata.setTopics(subscriptions.groupSubscription());
+ if (topics.isEmpty()) {
+ // treat subscribing to empty topic list as the same as unsubscribing
+ this.unsubscribe();
+ } else {
+ log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
+ this.subscriptions.subscribe(topics, listener);
+ metadata.setTopics(subscriptions.groupSubscription());
+ }
} finally {
release();
}
@@ -666,6 +674,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* assign partitions. Topic subscriptions are not incremental. This list will replace the current
* assignment (if there is one). It is not possible to combine topic subscription with group management
* with manual partition assignment through {@link #assign(List)}.
+ *
+ * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
+ *
* <p>
* This is a short-hand for {@link #subscribe(List, ConsumerRebalanceListener)}, which
* uses a noop listener. If you need the ability to either seek to particular offsets, you should prefer
@@ -715,6 +726,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public void unsubscribe() {
acquire();
try {
+ log.debug("Unsubscribed all topics or patterns and assigned partitions");
this.subscriptions.unsubscribe();
this.coordinator.resetGeneration();
this.metadata.needMetadataForAllTopics(false);
@@ -739,7 +751,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
acquire();
try {
log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
- this.subscriptions.assign(partitions);
+ this.subscriptions.assignFromUser(partitions);
Set<String> topics = new HashSet<>();
for (TopicPartition tp : partitions)
topics.add(tp.topic());
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 0242d7b..ed1c1e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -76,7 +76,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
public void rebalance(Collection<TopicPartition> newAssignment) {
// TODO: Rebalance callbacks
this.records.clear();
- this.subscriptions.changePartitionAssignment(newAssignment);
+ this.subscriptions.assignFromSubscribed(newAssignment);
}
@Override
@@ -112,7 +112,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
@Override
public void assign(List<TopicPartition> partitions) {
ensureNotClosed();
- this.subscriptions.assign(partitions);
+ this.subscriptions.assignFromUser(partitions);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index fc7e819..d6291bf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -169,7 +169,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
subscriptions.needRefreshCommits();
// update partition assignment
- subscriptions.changePartitionAssignment(assignment.partitions());
+ subscriptions.assignFromSubscribed(assignment.partitions());
// give the assignor a chance to update internal state based on the received assignment
assignor.onAssignment(assignment);
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6e79a7f..a9ff35f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -28,8 +28,8 @@ import java.util.regex.Pattern;
/**
* A class for tracking the topics, partitions, and offsets for the consumer. A partition
- * is "assigned" either directly with {@link #assign(List)} (manual assignment)
- * or with {@link #changePartitionAssignment(Collection)} (automatic assignment).
+ * is "assigned" either directly with {@link #assignFromUser(Collection)} (manual assignment)
+ * or with {@link #assignFromSubscribed(Collection)} (automatic assignment from subscription).
*
* Once assigned, the partition is not considered "fetchable" until its initial position has
* been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track a fetch
@@ -129,12 +129,16 @@ public class SubscriptionState {
}
public void needReassignment() {
- //
this.groupSubscription.retainAll(subscription);
this.needsPartitionAssignment = true;
}
- public void assign(List<TopicPartition> partitions) {
+ /**
+ * Change the assignment to the specified partitions provided by the user,
+ * note this is different from {@link #assignFromSubscribed(Collection)}
+ * whose input partitions are provided from the subscribed topics.
+ */
+ public void assignFromUser(Collection<TopicPartition> partitions) {
if (!this.subscription.isEmpty() || this.subscribedPattern != null)
throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
@@ -146,6 +150,22 @@ public class SubscriptionState {
addAssignedPartition(partition);
this.assignment.keySet().retainAll(this.userAssignment);
+
+ this.needsPartitionAssignment = false;
+ }
+
+ /**
+ * Change the assignment to the specified partitions returned from the coordinator,
+ * note this is different from {@link #assignFromUser(Collection)} which directly set the assignment from user inputs
+ */
+ public void assignFromSubscribed(Collection<TopicPartition> assignments) {
+ for (TopicPartition tp : assignments)
+ if (!this.subscription.contains(tp.topic()))
+ throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
+ this.assignment.clear();
+ for (TopicPartition tp: assignments)
+ addAssignedPartition(tp);
+ this.needsPartitionAssignment = false;
}
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
@@ -306,16 +326,6 @@ public class SubscriptionState {
return this.needsPartitionAssignment;
}
- public void changePartitionAssignment(Collection<TopicPartition> assignments) {
- for (TopicPartition tp : assignments)
- if (!this.subscription.contains(tp.topic()))
- throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
- this.assignment.clear();
- for (TopicPartition tp: assignments)
- addAssignedPartition(tp);
- this.needsPartitionAssignment = false;
- }
-
public boolean isAssigned(TopicPartition tp) {
return assignment.containsKey(tp);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 7625218..983c45d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -17,15 +17,20 @@
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.test.MockMetricsReporter;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerTest {
+ private final String topic = "test";
+ private final TopicPartition tp0 = new TopicPartition("test", 0);
+
@Test
public void testConstructorClose() throws Exception {
Properties props = new Properties();
@@ -46,4 +51,31 @@ public class KafkaConsumerTest {
}
Assert.fail("should have caught an exception and returned");
}
+
+ @Test
+ public void testSubscription() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSubscription");
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+
+ KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
+ props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+
+ consumer.subscribe(Collections.singletonList(topic));
+ Assert.assertEquals(Collections.singleton(topic), consumer.subscription());
+ Assert.assertTrue(consumer.assignment().isEmpty());
+
+ consumer.subscribe(Collections.<String>emptyList());
+ Assert.assertTrue(consumer.subscription().isEmpty());
+ Assert.assertTrue(consumer.assignment().isEmpty());
+
+ consumer.assign(Collections.singletonList(tp0));
+ Assert.assertTrue(consumer.subscription().isEmpty());
+ Assert.assertEquals(Collections.singleton(tp0), consumer.assignment());
+
+ consumer.unsubscribe();
+ Assert.assertTrue(consumer.subscription().isEmpty());
+ Assert.assertTrue(consumer.assignment().isEmpty());
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 93994d7..b20277f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -193,7 +193,7 @@ public class ConsumerCoordinatorTest {
// illegal_generation will cause re-partition
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.changePartitionAssignment(Collections.singletonList(tp));
+ subscriptions.assignFromSubscribed(Collections.singletonList(tp));
time.sleep(sessionTimeoutMs);
RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
@@ -217,7 +217,7 @@ public class ConsumerCoordinatorTest {
// illegal_generation will cause re-partition
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
- subscriptions.changePartitionAssignment(Collections.singletonList(tp));
+ subscriptions.assignFromSubscribed(Collections.singletonList(tp));
time.sleep(sessionTimeoutMs);
RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
@@ -413,7 +413,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCommitOffsetOnly() {
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
@@ -430,7 +430,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCommitOffsetMetadata() {
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
@@ -473,7 +473,7 @@ public class ConsumerCoordinatorTest {
// now switch to manual assignment
subscriptions.unsubscribe();
coordinator.resetGeneration();
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
// the client should not reuse generation/memberId from auto-subscribed generation
client.prepareResponse(new MockClient.RequestMatcher() {
@@ -612,7 +612,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
coordinator.refreshCommittedOffsetsIfNeeded();
@@ -625,7 +625,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
@@ -639,7 +639,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
@@ -654,7 +654,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needRefreshCommits();
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
coordinator.refreshCommittedOffsetsIfNeeded();
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 8773f8c..957d8f9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -110,7 +110,7 @@ public class FetcherTest {
@Test
public void testFetchNormal() {
List<ConsumerRecord<byte[], byte[]>> records;
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
// normal fetch
@@ -130,7 +130,7 @@ public class FetcherTest {
@Test(expected = RecordTooLargeException.class)
public void testFetchRecordTooLarge() {
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
// prepare large record
@@ -150,13 +150,13 @@ public class FetcherTest {
@Test
public void testFetchDuringRebalance() {
subscriptions.subscribe(Arrays.asList(topicName), listener);
- subscriptions.changePartitionAssignment(Arrays.asList(tp));
+ subscriptions.assignFromSubscribed(Arrays.asList(tp));
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
// Now the rebalance happens and fetch positions are cleared
- subscriptions.changePartitionAssignment(Arrays.asList(tp));
+ subscriptions.assignFromSubscribed(Arrays.asList(tp));
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
@@ -166,7 +166,7 @@ public class FetcherTest {
@Test
public void testInFlightFetchOnPausedPartition() {
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
@@ -179,7 +179,7 @@ public class FetcherTest {
@Test
public void testFetchOnPausedPartition() {
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
subscriptions.pause(tp);
@@ -189,7 +189,7 @@ public class FetcherTest {
@Test
public void testFetchNotLeaderForPartition() {
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
@@ -201,7 +201,7 @@ public class FetcherTest {
@Test
public void testFetchUnknownTopicOrPartition() {
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
@@ -213,7 +213,7 @@ public class FetcherTest {
@Test
public void testFetchOffsetOutOfRange() {
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
@@ -227,7 +227,7 @@ public class FetcherTest {
@Test
public void testFetchedRecordsAfterSeek() {
- subscriptionsNoAutoReset.assign(Arrays.asList(tp));
+ subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
subscriptionsNoAutoReset.seek(tp, 0);
fetcherNoAutoReset.initFetches(cluster);
@@ -240,7 +240,7 @@ public class FetcherTest {
@Test
public void testFetchOffsetOutOfRangeException() {
- subscriptionsNoAutoReset.assign(Arrays.asList(tp));
+ subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
subscriptionsNoAutoReset.seek(tp, 0);
fetcherNoAutoReset.initFetches(cluster);
@@ -259,7 +259,7 @@ public class FetcherTest {
@Test
public void testFetchDisconnected() {
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
@@ -278,7 +278,7 @@ public class FetcherTest {
public void testUpdateFetchPositionToCommitted() {
// unless a specific reset is expected, the default behavior is to reset to the committed
// position if one is present
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.committed(tp, new OffsetAndMetadata(5));
fetcher.updateFetchPositions(Collections.singleton(tp));
@@ -289,7 +289,7 @@ public class FetcherTest {
@Test
public void testUpdateFetchPositionResetToDefaultOffset() {
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
// with no commit position, we should reset using the default strategy defined above (EARLIEST)
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
@@ -303,7 +303,7 @@ public class FetcherTest {
@Test
public void testUpdateFetchPositionResetToLatestOffset() {
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
@@ -317,7 +317,7 @@ public class FetcherTest {
@Test
public void testUpdateFetchPositionResetToEarliestOffset() {
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
@@ -331,7 +331,7 @@ public class FetcherTest {
@Test
public void testUpdateFetchPositionDisconnect() {
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
// First request gets a disconnect
@@ -365,7 +365,7 @@ public class FetcherTest {
@Test
public void testQuotaMetrics() throws Exception {
List<ConsumerRecord<byte[], byte[]>> records;
- subscriptions.assign(Arrays.asList(tp));
+ subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
// normal fetch
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index a0568ad..c5fce61 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -35,19 +35,22 @@ import org.junit.Test;
public class SubscriptionStateTest {
private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST);
+ private final String topic = "test";
+ private final String topic1 = "test1";
private final TopicPartition tp0 = new TopicPartition("test", 0);
private final TopicPartition tp1 = new TopicPartition("test", 1);
private final MockRebalanceListener rebalanceListener = new MockRebalanceListener();
@Test
public void partitionAssignment() {
- state.assign(Arrays.asList(tp0));
+ state.assignFromUser(Arrays.asList(tp0));
assertEquals(Collections.singleton(tp0), state.assignedPartitions());
+ assertFalse(state.partitionAssignmentNeeded());
state.committed(tp0, new OffsetAndMetadata(1));
state.seek(tp0, 1);
assertTrue(state.isFetchable(tp0));
assertAllPositions(tp0, 1L);
- state.assign(Arrays.<TopicPartition>asList());
+ state.assignFromUser(Arrays.<TopicPartition>asList());
assertTrue(state.assignedPartitions().isEmpty());
assertFalse(state.isAssigned(tp0));
assertFalse(state.isFetchable(tp0));
@@ -55,7 +58,7 @@ public class SubscriptionStateTest {
@Test
public void partitionReset() {
- state.assign(Arrays.asList(tp0));
+ state.assignFromUser(Arrays.asList(tp0));
state.seek(tp0, 5);
assertEquals(5L, (long) state.fetched(tp0));
assertEquals(5L, (long) state.consumed(tp0));
@@ -73,16 +76,18 @@ public class SubscriptionStateTest {
@Test
public void topicSubscription() {
- state.subscribe(Arrays.asList("test"), rebalanceListener);
+ state.subscribe(Arrays.asList(topic), rebalanceListener);
assertEquals(1, state.subscription().size());
+ assertTrue(state.partitionAssignmentNeeded());
assertTrue(state.assignedPartitions().isEmpty());
assertTrue(state.partitionsAutoAssigned());
- state.changePartitionAssignment(asList(tp0));
+ state.assignFromSubscribed(asList(tp0));
state.seek(tp0, 1);
state.committed(tp0, new OffsetAndMetadata(1));
assertAllPositions(tp0, 1L);
- state.changePartitionAssignment(asList(tp1));
+ state.assignFromSubscribed(asList(tp1));
assertTrue(state.isAssigned(tp1));
+ assertFalse(state.partitionAssignmentNeeded());
assertFalse(state.isAssigned(tp0));
assertFalse(state.isFetchable(tp1));
assertEquals(Collections.singleton(tp1), state.assignedPartitions());
@@ -90,7 +95,7 @@ public class SubscriptionStateTest {
@Test
public void partitionPause() {
- state.assign(Arrays.asList(tp0));
+ state.assignFromUser(Arrays.asList(tp0));
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
state.pause(tp0);
@@ -101,44 +106,24 @@ public class SubscriptionStateTest {
@Test
public void commitOffsetMetadata() {
- state.assign(Arrays.asList(tp0));
+ state.assignFromUser(Arrays.asList(tp0));
state.committed(tp0, new OffsetAndMetadata(5, "hi"));
assertEquals(5, state.committed(tp0).offset());
assertEquals("hi", state.committed(tp0).metadata());
}
- @Test
- public void topicUnsubscription() {
- final String topic = "test";
- state.subscribe(Arrays.asList(topic), rebalanceListener);
- assertEquals(1, state.subscription().size());
- assertTrue(state.assignedPartitions().isEmpty());
- assertTrue(state.partitionsAutoAssigned());
- state.changePartitionAssignment(asList(tp0));
- state.committed(tp0, new OffsetAndMetadata(1));
- state.seek(tp0, 1);
- assertAllPositions(tp0, 1L);
- state.changePartitionAssignment(asList(tp1));
- assertFalse(state.isAssigned(tp0));
- assertEquals(Collections.singleton(tp1), state.assignedPartitions());
-
- state.subscribe(Arrays.<String>asList(), rebalanceListener);
- assertEquals(0, state.subscription().size());
- assertTrue(state.assignedPartitions().isEmpty());
- }
-
@Test(expected = IllegalStateException.class)
public void invalidConsumedPositionUpdate() {
- state.subscribe(Arrays.asList("test"), rebalanceListener);
- state.changePartitionAssignment(asList(tp0));
+ state.subscribe(Arrays.asList(topic), rebalanceListener);
+ state.assignFromSubscribed(asList(tp0));
state.consumed(tp0, 0);
}
@Test(expected = IllegalStateException.class)
public void invalidFetchPositionUpdate() {
- state.subscribe(Arrays.asList("test"), rebalanceListener);
- state.changePartitionAssignment(asList(tp0));
+ state.subscribe(Arrays.asList(topic), rebalanceListener);
+ state.assignFromSubscribed(asList(tp0));
state.fetched(tp0, 0);
}
@@ -160,46 +145,60 @@ public class SubscriptionStateTest {
@Test(expected = IllegalStateException.class)
public void cantSubscribeTopicAndPattern() {
- state.subscribe(Arrays.asList("test"), rebalanceListener);
+ state.subscribe(Arrays.asList(topic), rebalanceListener);
state.subscribe(Pattern.compile(".*"), rebalanceListener);
}
@Test(expected = IllegalStateException.class)
public void cantSubscribePartitionAndPattern() {
- state.assign(Arrays.asList(new TopicPartition("test", 0)));
+ state.assignFromUser(Arrays.asList(tp0));
state.subscribe(Pattern.compile(".*"), rebalanceListener);
}
@Test(expected = IllegalStateException.class)
public void cantSubscribePatternAndTopic() {
state.subscribe(Pattern.compile(".*"), rebalanceListener);
- state.subscribe(Arrays.asList("test"), rebalanceListener);
+ state.subscribe(Arrays.asList(topic), rebalanceListener);
}
@Test(expected = IllegalStateException.class)
public void cantSubscribePatternAndPartition() {
state.subscribe(Pattern.compile(".*"), rebalanceListener);
- state.assign(Arrays.asList(new TopicPartition("test", 0)));
+ state.assignFromUser(Arrays.asList(tp0));
}
@Test
public void patternSubscription() {
state.subscribe(Pattern.compile(".*"), rebalanceListener);
- state.changeSubscription(Arrays.asList("test", "test1"));
+ state.changeSubscription(Arrays.asList(topic, topic1));
assertEquals(
"Expected subscribed topics count is incorrect", 2, state.subscription().size());
}
@Test
- public void patternUnsubscription() {
+ public void unsubscription() {
state.subscribe(Pattern.compile(".*"), rebalanceListener);
- state.changeSubscription(Arrays.asList("test", "test1"));
+ state.changeSubscription(Arrays.asList(topic, topic1));
+ assertTrue(state.partitionAssignmentNeeded());
+
+ state.assignFromSubscribed(asList(tp1));
+ assertEquals(Collections.singleton(tp1), state.assignedPartitions());
+ assertFalse(state.partitionAssignmentNeeded());
state.unsubscribe();
+ assertEquals(0, state.subscription().size());
+ assertTrue(state.assignedPartitions().isEmpty());
+ assertTrue(state.partitionAssignmentNeeded());
- assertEquals(
- "Expected subscribed topics count is incorrect", 0, state.subscription().size());
+ state.assignFromUser(Arrays.asList(tp0));
+ assertEquals(Collections.singleton(tp0), state.assignedPartitions());
+ assertFalse(state.partitionAssignmentNeeded());
+
+ state.unsubscribe();
+ assertEquals(0, state.subscription().size());
+ assertTrue(state.assignedPartitions().isEmpty());
+ assertTrue(state.partitionAssignmentNeeded());
}
private static class MockRebalanceListener implements ConsumerRebalanceListener {
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index d1759ce..b044cf4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -31,7 +31,6 @@ import java.util.HashMap;
public class KafkaProducerTest {
-
@Test
public void testConstructorFailureCloseResource() {
Properties props = new Properties();