You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/10/03 23:19:35 UTC
kafka git commit: KAFKA-4248;
Consumer should rematch regex immediately in subscribe
Repository: kafka
Updated Branches:
refs/heads/trunk 496594a12 -> a23859e56
KAFKA-4248; Consumer should rematch regex immediately in subscribe
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1954 from hachikuji/rematch-regex-on-subscribe
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a23859e5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a23859e5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a23859e5
Branch: refs/heads/trunk
Commit: a23859e5686bf93ed7e0d310f949757694d47a1b
Parents: 496594a
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Oct 3 16:19:41 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Oct 3 16:19:41 2016 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 3 +
.../consumer/internals/ConsumerCoordinator.java | 31 ++--
.../clients/consumer/KafkaConsumerTest.java | 177 ++++++++++++++++---
3 files changed, 175 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a23859e5/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e263448..b2b4bf0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -855,6 +855,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* </ul>
*
* @param pattern Pattern to subscribe to
+ * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
+ * subscribed topics
* @throws IllegalArgumentException If pattern is null
*/
@Override
@@ -867,6 +869,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.subscriptions.subscribe(pattern, listener);
this.metadata.needMetadataForAllTopics(true);
this.metadata.requestUpdate();
+ this.coordinator.updatePatternSubscription(metadata.fetch());
} finally {
release();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a23859e5/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 27d6a75..bc77a7a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -149,6 +149,21 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return metadataList;
}
+ public void updatePatternSubscription(Cluster cluster) {
+ final Set<String> topicsToSubscribe = new HashSet<>();
+
+ for (String topic : cluster.topics())
+ if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
+ !(excludeInternalTopics && cluster.internalTopics().contains(topic)))
+ topicsToSubscribe.add(topic);
+
+ subscriptions.subscribeFromPattern(topicsToSubscribe);
+
+ // note we still need to update the topics contained in the metadata. Although we have
+ // specified that all topics should be fetched, only those set explicitly will be retained
+ metadata.setTopics(subscriptions.groupSubscription());
+ }
+
private void addMetadataListener() {
this.metadata.addListener(new Metadata.Listener() {
@Override
@@ -157,20 +172,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (!cluster.unauthorizedTopics().isEmpty())
throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
- if (subscriptions.hasPatternSubscription()) {
- final Set<String> topicsToSubscribe = new HashSet<>();
-
- for (String topic : cluster.topics())
- if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
- !(excludeInternalTopics && cluster.internalTopics().contains(topic)))
- topicsToSubscribe.add(topic);
-
- subscriptions.subscribeFromPattern(topicsToSubscribe);
-
- // note we still need to update the topics contained in the metadata. Although we have
- // specified that all topics should be fetched, only those set explicitly will be retained
- metadata.setTopics(subscriptions.groupSubscription());
- }
+ if (subscriptions.hasPatternSubscription())
+ updatePatternSubscription(cluster);
// check if there are any changes to the metadata which should trigger a rebalance
if (subscriptions.partitionsAutoAssigned()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a23859e5/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index b1c6962..bf45ee6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchResponse.PartitionData;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
@@ -66,9 +67,11 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
@@ -337,12 +340,14 @@ public class KafkaConsumerTest {
int autoCommitIntervalMs = 10000;
Time time = new MockTime();
- MockClient client = new MockClient(time);
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
- client.setNode(node);
+
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
+
+ MockClient client = new MockClient(time);
+ client.setNode(node);
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -376,12 +381,14 @@ public class KafkaConsumerTest {
int autoCommitIntervalMs = 10000;
Time time = new MockTime();
- MockClient client = new MockClient(time);
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
- client.setNode(node);
+
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -415,12 +422,14 @@ public class KafkaConsumerTest {
int autoCommitIntervalMs = 1000;
Time time = new MockTime();
- MockClient client = new MockClient(time);
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
- client.setNode(node);
+
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -449,12 +458,14 @@ public class KafkaConsumerTest {
int autoCommitIntervalMs = 1000;
Time time = new MockTime();
- MockClient client = new MockClient(time);
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
- client.setNode(node);
+
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -495,12 +506,14 @@ public class KafkaConsumerTest {
int autoCommitIntervalMs = 1000;
Time time = new MockTime();
- MockClient client = new MockClient(time);
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
- client.setNode(node);
+
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -528,6 +541,91 @@ public class KafkaConsumerTest {
}
@Test
+ public void testRegexSubscription() {
+ int rebalanceTimeoutMs = 60000;
+ int sessionTimeoutMs = 30000;
+ int heartbeatIntervalMs = 3000;
+ int autoCommitIntervalMs = 1000;
+
+ String unmatchedTopic = "unmatched";
+
+ Time time = new MockTime();
+
+ Map<String, Integer> topicMetadata = new HashMap<>();
+ topicMetadata.put(topic, 1);
+ topicMetadata.put(unmatchedTopic, 1);
+
+ Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+ Node node = cluster.nodes().get(0);
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
+ PartitionAssignor assignor = new RoundRobinAssignor();
+
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+
+
+ prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
+
+ consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
+
+ client.prepareMetadataUpdate(cluster);
+
+ consumer.poll(0);
+ assertEquals(singleton(topic), consumer.subscription());
+ assertEquals(singleton(tp0), consumer.assignment());
+ }
+
+ @Test
+ public void testChangingRegexSubscription() {
+ int rebalanceTimeoutMs = 60000;
+ int sessionTimeoutMs = 30000;
+ int heartbeatIntervalMs = 3000;
+ int autoCommitIntervalMs = 1000;
+ PartitionAssignor assignor = new RoundRobinAssignor();
+
+ String otherTopic = "other";
+ TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0);
+
+ Time time = new MockTime();
+
+ Map<String, Integer> topicMetadata = new HashMap<>();
+ topicMetadata.put(topic, 1);
+ topicMetadata.put(otherTopic, 1);
+
+ Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+ Node node = cluster.nodes().get(0);
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
+
+ metadata.update(cluster, time.milliseconds());
+
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
+
+ Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
+ consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
+
+ client.prepareMetadataUpdate(cluster);
+
+ consumer.poll(0);
+ assertEquals(singleton(topic), consumer.subscription());
+
+ consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer));
+
+ client.prepareMetadataUpdate(cluster);
+
+ prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition), coordinator);
+ consumer.poll(0);
+
+ assertEquals(singleton(otherTopic), consumer.subscription());
+ }
+
+ @Test
public void testWakeupWithFetchDataAvailable() {
int rebalanceTimeoutMs = 60000;
int sessionTimeoutMs = 30000;
@@ -538,12 +636,14 @@ public class KafkaConsumerTest {
int autoCommitIntervalMs = 1000;
Time time = new MockTime();
- MockClient client = new MockClient(time);
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
- client.setNode(node);
+
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -585,12 +685,14 @@ public class KafkaConsumerTest {
int autoCommitIntervalMs = 1000;
Time time = new MockTime();
- MockClient client = new MockClient(time);
Cluster cluster = TestUtils.singletonCluster(singletonMap(topic, 1));
Node node = cluster.nodes().get(0);
- client.setNode(node);
+
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
PartitionAssignor assignor = new RangeAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -627,16 +729,18 @@ public class KafkaConsumerTest {
int autoCommitIntervalMs = 1000;
Time time = new MockTime();
- MockClient client = new MockClient(time);
Map<String, Integer> tpCounts = new HashMap<>();
tpCounts.put(topic, 1);
tpCounts.put(topic2, 1);
tpCounts.put(topic3, 1);
Cluster cluster = TestUtils.singletonCluster(tpCounts);
Node node = cluster.nodes().get(0);
- client.setNode(node);
+
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
PartitionAssignor assignor = new RangeAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -745,15 +849,17 @@ public class KafkaConsumerTest {
int autoCommitIntervalMs = 1000;
Time time = new MockTime();
- MockClient client = new MockClient(time);
Map<String, Integer> tpCounts = new HashMap<>();
tpCounts.put(topic, 1);
tpCounts.put(topic2, 1);
Cluster cluster = TestUtils.singletonCluster(tpCounts);
Node node = cluster.nodes().get(0);
- client.setNode(node);
+
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
PartitionAssignor assignor = new RangeAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -810,15 +916,17 @@ public class KafkaConsumerTest {
int autoCommitIntervalMs = 1000;
Time time = new MockTime();
- MockClient client = new MockClient(time);
Map<String, Integer> tpCounts = new HashMap<>();
tpCounts.put(topic, 1);
tpCounts.put(topic2, 1);
Cluster cluster = TestUtils.singletonCluster(tpCounts);
Node node = cluster.nodes().get(0);
- client.setNode(node);
+
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
PartitionAssignor assignor = new RangeAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -872,15 +980,17 @@ public class KafkaConsumerTest {
int autoCommitIntervalMs = 1000;
Time time = new MockTime();
- MockClient client = new MockClient(time);
Map<String, Integer> tpCounts = new HashMap<>();
tpCounts.put(topic, 1);
tpCounts.put(topic2, 1);
Cluster cluster = TestUtils.singletonCluster(tpCounts);
Node node = cluster.nodes().get(0);
- client.setNode(node);
+
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
PartitionAssignor assignor = new RangeAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -973,6 +1083,29 @@ public class KafkaConsumerTest {
};
}
+ private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
+ if (coordinator == null) {
+ // lookup coordinator
+ client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+ coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+ }
+
+ // join group
+ client.prepareResponseFrom(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ JoinGroupRequest joinGroupRequest = new JoinGroupRequest(request.request().body());
+ PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(joinGroupRequest.groupProtocols().get(0).metadata());
+ return subscribedTopics.equals(new HashSet<>(subscription.topics()));
+ }
+ }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator);
+
+ // sync group
+ client.prepareResponseFrom(syncGroupResponse(partitions, Errors.NONE.code()), coordinator);
+
+ return coordinator;
+ }
+
private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
if (coordinator == null) {
// lookup coordinator