You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/10/03 23:19:35 UTC

kafka git commit: KAFKA-4248; Consumer should rematch regex immediately in subscribe

Repository: kafka
Updated Branches:
  refs/heads/trunk 496594a12 -> a23859e56


KAFKA-4248; Consumer should rematch regex immediately in subscribe

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1954 from hachikuji/rematch-regex-on-subscribe


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a23859e5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a23859e5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a23859e5

Branch: refs/heads/trunk
Commit: a23859e5686bf93ed7e0d310f949757694d47a1b
Parents: 496594a
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Oct 3 16:19:41 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Oct 3 16:19:41 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |   3 +
 .../consumer/internals/ConsumerCoordinator.java |  31 ++--
 .../clients/consumer/KafkaConsumerTest.java     | 177 ++++++++++++++++---
 3 files changed, 175 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a23859e5/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e263448..b2b4bf0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -855,6 +855,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * </ul>
      *
      * @param pattern Pattern to subscribe to
+     * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
+     *                 subscribed topics
      * @throws IllegalArgumentException If pattern is null
      */
     @Override
@@ -867,6 +869,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.subscriptions.subscribe(pattern, listener);
             this.metadata.needMetadataForAllTopics(true);
             this.metadata.requestUpdate();
+            this.coordinator.updatePatternSubscription(metadata.fetch());
         } finally {
             release();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a23859e5/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 27d6a75..bc77a7a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -149,6 +149,21 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         return metadataList;
     }
 
+    public void updatePatternSubscription(Cluster cluster) {
+        final Set<String> topicsToSubscribe = new HashSet<>();
+
+        for (String topic : cluster.topics())
+            if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
+                    !(excludeInternalTopics && cluster.internalTopics().contains(topic)))
+                topicsToSubscribe.add(topic);
+
+        subscriptions.subscribeFromPattern(topicsToSubscribe);
+
+        // note we still need to update the topics contained in the metadata. Although we have
+        // specified that all topics should be fetched, only those set explicitly will be retained
+        metadata.setTopics(subscriptions.groupSubscription());
+    }
+
     private void addMetadataListener() {
         this.metadata.addListener(new Metadata.Listener() {
             @Override
@@ -157,20 +172,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 if (!cluster.unauthorizedTopics().isEmpty())
                     throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
 
-                if (subscriptions.hasPatternSubscription()) {
-                    final Set<String> topicsToSubscribe = new HashSet<>();
-
-                    for (String topic : cluster.topics())
-                        if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
-                                !(excludeInternalTopics && cluster.internalTopics().contains(topic)))
-                            topicsToSubscribe.add(topic);
-
-                    subscriptions.subscribeFromPattern(topicsToSubscribe);
-
-                    // note we still need to update the topics contained in the metadata. Although we have
-                    // specified that all topics should be fetched, only those set explicitly will be retained
-                    metadata.setTopics(subscriptions.groupSubscription());
-                }
+                if (subscriptions.hasPatternSubscription())
+                    updatePatternSubscription(cluster);
 
                 // check if there are any changes to the metadata which should trigger a rebalance
                 if (subscriptions.partitionsAutoAssigned()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a23859e5/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index b1c6962..bf45ee6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.FetchResponse.PartitionData;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
@@ -66,9 +67,11 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
@@ -337,12 +340,14 @@ public class KafkaConsumerTest {
         int autoCommitIntervalMs = 10000;
 
         Time time = new MockTime();
-        MockClient client = new MockClient(time);
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
-        client.setNode(node);
+
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
         metadata.update(cluster, time.milliseconds());
+
+        MockClient client = new MockClient(time);
+        client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -376,12 +381,14 @@ public class KafkaConsumerTest {
         int autoCommitIntervalMs = 10000;
 
         Time time = new MockTime();
-        MockClient client = new MockClient(time);
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
-        client.setNode(node);
+
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
         metadata.update(cluster, time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -415,12 +422,14 @@ public class KafkaConsumerTest {
         int autoCommitIntervalMs = 1000;
 
         Time time = new MockTime();
-        MockClient client = new MockClient(time);
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
-        client.setNode(node);
+
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
         metadata.update(cluster, time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -449,12 +458,14 @@ public class KafkaConsumerTest {
         int autoCommitIntervalMs = 1000;
 
         Time time = new MockTime();
-        MockClient client = new MockClient(time);
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
-        client.setNode(node);
+
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
         metadata.update(cluster, time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -495,12 +506,14 @@ public class KafkaConsumerTest {
         int autoCommitIntervalMs = 1000;
 
         Time time = new MockTime();
-        MockClient client = new MockClient(time);
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
-        client.setNode(node);
+
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
         metadata.update(cluster, time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -528,6 +541,91 @@ public class KafkaConsumerTest {
     }
 
     @Test
+    public void testRegexSubscription() {
+        int rebalanceTimeoutMs = 60000;
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+        int autoCommitIntervalMs = 1000;
+
+        String unmatchedTopic = "unmatched";
+
+        Time time = new MockTime();
+
+        Map<String, Integer> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic, 1);
+        topicMetadata.put(unmatchedTopic, 1);
+
+        Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Node node = cluster.nodes().get(0);
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+
+
+        prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
+
+        consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
+
+        client.prepareMetadataUpdate(cluster);
+
+        consumer.poll(0);
+        assertEquals(singleton(topic), consumer.subscription());
+        assertEquals(singleton(tp0), consumer.assignment());
+    }
+
+    @Test
+    public void testChangingRegexSubscription() {
+        int rebalanceTimeoutMs = 60000;
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+        int autoCommitIntervalMs = 1000;
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        String otherTopic = "other";
+        TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0);
+
+        Time time = new MockTime();
+
+        Map<String, Integer> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic, 1);
+        topicMetadata.put(otherTopic, 1);
+
+        Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Node node = cluster.nodes().get(0);
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+
+        metadata.update(cluster, time.milliseconds());
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
+
+        Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
+        consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
+
+        client.prepareMetadataUpdate(cluster);
+
+        consumer.poll(0);
+        assertEquals(singleton(topic), consumer.subscription());
+
+        consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer));
+
+        client.prepareMetadataUpdate(cluster);
+
+        prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition), coordinator);
+        consumer.poll(0);
+
+        assertEquals(singleton(otherTopic), consumer.subscription());
+    }
+
+    @Test
     public void testWakeupWithFetchDataAvailable() {
         int rebalanceTimeoutMs = 60000;
         int sessionTimeoutMs = 30000;
@@ -538,12 +636,14 @@ public class KafkaConsumerTest {
         int autoCommitIntervalMs = 1000;
 
         Time time = new MockTime();
-        MockClient client = new MockClient(time);
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
-        client.setNode(node);
+
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
         metadata.update(cluster, time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -585,12 +685,14 @@ public class KafkaConsumerTest {
         int autoCommitIntervalMs = 1000;
 
         Time time = new MockTime();
-        MockClient client = new MockClient(time);
         Cluster cluster = TestUtils.singletonCluster(singletonMap(topic, 1));
         Node node = cluster.nodes().get(0);
-        client.setNode(node);
+
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
         metadata.update(cluster, time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -627,16 +729,18 @@ public class KafkaConsumerTest {
         int autoCommitIntervalMs = 1000;
 
         Time time = new MockTime();
-        MockClient client = new MockClient(time);
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
         tpCounts.put(topic2, 1);
         tpCounts.put(topic3, 1);
         Cluster cluster = TestUtils.singletonCluster(tpCounts);
         Node node = cluster.nodes().get(0);
-        client.setNode(node);
+
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
         metadata.update(cluster, time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -745,15 +849,17 @@ public class KafkaConsumerTest {
         int autoCommitIntervalMs = 1000;
 
         Time time = new MockTime();
-        MockClient client = new MockClient(time);
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
         tpCounts.put(topic2, 1);
         Cluster cluster = TestUtils.singletonCluster(tpCounts);
         Node node = cluster.nodes().get(0);
-        client.setNode(node);
+
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
         metadata.update(cluster, time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -810,15 +916,17 @@ public class KafkaConsumerTest {
         int autoCommitIntervalMs = 1000;
 
         Time time = new MockTime();
-        MockClient client = new MockClient(time);
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
         tpCounts.put(topic2, 1);
         Cluster cluster = TestUtils.singletonCluster(tpCounts);
         Node node = cluster.nodes().get(0);
-        client.setNode(node);
+
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
         metadata.update(cluster, time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -872,15 +980,17 @@ public class KafkaConsumerTest {
         int autoCommitIntervalMs = 1000;
 
         Time time = new MockTime();
-        MockClient client = new MockClient(time);
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
         tpCounts.put(topic2, 1);
         Cluster cluster = TestUtils.singletonCluster(tpCounts);
         Node node = cluster.nodes().get(0);
-        client.setNode(node);
+
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
         metadata.update(cluster, time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -973,6 +1083,29 @@ public class KafkaConsumerTest {
         };
     }
 
+    private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
+        if (coordinator == null) {
+            // lookup coordinator
+            client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+            coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+        }
+
+        // join group
+        client.prepareResponseFrom(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                JoinGroupRequest joinGroupRequest = new JoinGroupRequest(request.request().body());
+                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(joinGroupRequest.groupProtocols().get(0).metadata());
+                return subscribedTopics.equals(new HashSet<>(subscription.topics()));
+            }
+        }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator);
+
+        // sync group
+        client.prepareResponseFrom(syncGroupResponse(partitions, Errors.NONE.code()), coordinator);
+
+        return coordinator;
+    }
+
     private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator