You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/08 20:03:52 UTC

kafka git commit: KAFKA-4269: Follow up for 0.10.1 branch -update topic subscriptions for regex

Repository: kafka
Updated Branches:
  refs/heads/0.10.1 fc6540023 -> fd2a9b4e2


KAFKA-4269: Follow up for 0.10.1 branch -update topic subscriptions for regex

Author: Bill Bejeck <bb...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2090 from bbejeck/KAFKA-4269_follow_up_for_updating_topic_groups_for_regex_subscription


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fd2a9b4e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fd2a9b4e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fd2a9b4e

Branch: refs/heads/0.10.1
Commit: fd2a9b4e2f395923fdabf0f4d500e325ec88a358
Parents: fc65400
Author: Bill Bejeck <bb...@gmail.com>
Authored: Tue Nov 8 12:03:48 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 8 12:03:48 2016 -0800

----------------------------------------------------------------------
 .../streams/processor/TopologyBuilder.java      | 27 +++++++++++------
 .../internals/StreamPartitionAssignor.java      | 10 +++++--
 .../streams/processor/TopologyBuilderTest.java  | 31 ++++++++++++++++++++
 3 files changed, 57 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fd2a9b4e/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index f5fd571..81f1f63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -30,6 +30,8 @@ import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -101,6 +103,8 @@ public class TopologyBuilder {
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
+    private static final Logger log = LoggerFactory.getLogger(TopologyBuilder.class);
+
     private static class StateStoreFactory {
         public final Set<String> users;
 
@@ -831,14 +835,6 @@ public class TopologyBuilder {
     public synchronized Map<Integer, TopicsInfo> topicGroups() {
         Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
 
-        if (subscriptionUpdates.hasUpdates()) {
-            for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
-                SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
-                //need to update nodeToSourceTopics with topics matched from given regex
-                nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
-            }
-        }
-
         if (nodeGroups == null)
             nodeGroups = makeNodeGroups();
 
@@ -897,6 +893,17 @@ public class TopologyBuilder {
         return Collections.unmodifiableMap(topicGroups);
     }
 
+    private void setRegexMatchedTopicsToSourceNodes() {
+        if (subscriptionUpdates.hasUpdates()) {
+            for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
+                SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
+                //need to update nodeToSourceTopics with topics matched from given regex
+                nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
+                log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
+            }
+        }
+    }
+
     private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier supplier, final String name) {
         if (!(supplier instanceof RocksDBWindowStoreSupplier)) {
             return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
@@ -999,7 +1006,9 @@ public class TopologyBuilder {
         return this.topicPattern;
     }
 
-    public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) {
+    public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates, String threadId) {
+        log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching regex subscription(s)", threadId, subscriptionUpdates);
         this.subscriptionUpdates = subscriptionUpdates;
+        setRegexMatchedTopicsToSourceNodes();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd2a9b4e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 3be9c11..dcba543 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -178,10 +178,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         if (streamThread.builder.sourceTopicPattern() != null) {
             SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
-            log.debug("have {} topics matching regex", topics);
+            log.debug("stream-thread [{}] found {} topics possibly matching regex", streamThread.getName(), topics);
             // update the topic groups with the returned subscription set for regex pattern subscriptions
             subscriptionUpdates.updateTopics(topics);
-            streamThread.builder.updateSubscriptions(subscriptionUpdates);
+            streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName());
         }
 
         return new Subscription(new ArrayList<>(topics), data.encode());
@@ -669,6 +669,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             return !updatedTopicSubscriptions.isEmpty();
         }
 
+        @Override
+        public String toString() {
+            return "SubscriptionUpdates{" +
+                    "updatedTopicSubscriptions=" + updatedTopicSubscriptions +
+                    '}';
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd2a9b4e/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 3f45967..d260937 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -27,12 +27,14 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.ProcessorTopologyTestDriver;
 import org.junit.Test;
 
+import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -521,6 +523,7 @@ public class TopologyBuilderTest {
         assertEquals(1, properties.size());
     }
 
+
     @Test(expected = TopologyBuilderException.class)
     public void shouldThroughOnUnassignedStateStoreAccess() {
         final String sourceNodeName = "source";
@@ -583,4 +586,32 @@ public class TopologyBuilderTest {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source-1", "topic-foo");
+        builder.addSource("source-2", Pattern.compile("topic-[A-C]"));
+        builder.addSource("source-3", Pattern.compile("topic-\\d"));
+
+        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
+        Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
+        updatedTopicsField.setAccessible(true);
+
+        Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
+
+        updatedTopics.add("topic-B");
+        updatedTopics.add("topic-3");
+        updatedTopics.add("topic-A");
+
+        builder.updateSubscriptions(subscriptionUpdates, null);
+        builder.setApplicationId("test-id");
+
+        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
+        assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo"));
+        assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A"));
+        assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B"));
+        assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3"));
+
+    }
 }