You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/08 20:03:52 UTC
kafka git commit: KAFKA-4269: Follow up for 0.10.1 branch -update
topic subscriptions for regex
Repository: kafka
Updated Branches:
refs/heads/0.10.1 fc6540023 -> fd2a9b4e2
KAFKA-4269: Follow up for 0.10.1 branch -update topic subscriptions for regex
Author: Bill Bejeck <bb...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2090 from bbejeck/KAFKA-4269_follow_up_for_updating_topic_groups_for_regex_subscription
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fd2a9b4e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fd2a9b4e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fd2a9b4e
Branch: refs/heads/0.10.1
Commit: fd2a9b4e2f395923fdabf0f4d500e325ec88a358
Parents: fc65400
Author: Bill Bejeck <bb...@gmail.com>
Authored: Tue Nov 8 12:03:48 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 8 12:03:48 2016 -0800
----------------------------------------------------------------------
.../streams/processor/TopologyBuilder.java | 27 +++++++++++------
.../internals/StreamPartitionAssignor.java | 10 +++++--
.../streams/processor/TopologyBuilderTest.java | 31 ++++++++++++++++++++
3 files changed, 57 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fd2a9b4e/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index f5fd571..81f1f63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -30,6 +30,8 @@ import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -101,6 +103,8 @@ public class TopologyBuilder {
private Map<Integer, Set<String>> nodeGroups = null;
+ private static final Logger log = LoggerFactory.getLogger(TopologyBuilder.class);
+
private static class StateStoreFactory {
public final Set<String> users;
@@ -831,14 +835,6 @@ public class TopologyBuilder {
public synchronized Map<Integer, TopicsInfo> topicGroups() {
Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
- if (subscriptionUpdates.hasUpdates()) {
- for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
- SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
- //need to update nodeToSourceTopics with topics matched from given regex
- nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
- }
- }
-
if (nodeGroups == null)
nodeGroups = makeNodeGroups();
@@ -897,6 +893,17 @@ public class TopologyBuilder {
return Collections.unmodifiableMap(topicGroups);
}
+ private void setRegexMatchedTopicsToSourceNodes() {
+ if (subscriptionUpdates.hasUpdates()) {
+ for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
+ SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
+ //need to update nodeToSourceTopics with topics matched from given regex
+ nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
+ log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
+ }
+ }
+ }
+
private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier supplier, final String name) {
if (!(supplier instanceof RocksDBWindowStoreSupplier)) {
return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
@@ -999,7 +1006,9 @@ public class TopologyBuilder {
return this.topicPattern;
}
- public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) {
+ public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates, String threadId) {
+ log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching regex subscription(s)", threadId, subscriptionUpdates);
this.subscriptionUpdates = subscriptionUpdates;
+ setRegexMatchedTopicsToSourceNodes();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fd2a9b4e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 3be9c11..dcba543 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -178,10 +178,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
if (streamThread.builder.sourceTopicPattern() != null) {
SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
- log.debug("have {} topics matching regex", topics);
+ log.debug("stream-thread [{}] found {} topics possibly matching regex", streamThread.getName(), topics);
// update the topic groups with the returned subscription set for regex pattern subscriptions
subscriptionUpdates.updateTopics(topics);
- streamThread.builder.updateSubscriptions(subscriptionUpdates);
+ streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName());
}
return new Subscription(new ArrayList<>(topics), data.encode());
@@ -669,6 +669,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
return !updatedTopicSubscriptions.isEmpty();
}
+ @Override
+ public String toString() {
+ return "SubscriptionUpdates{" +
+ "updatedTopicSubscriptions=" + updatedTopicSubscriptions +
+ '}';
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fd2a9b4e/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 3f45967..d260937 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -27,12 +27,14 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.junit.Test;
+import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -521,6 +523,7 @@ public class TopologyBuilderTest {
assertEquals(1, properties.size());
}
+
@Test(expected = TopologyBuilderException.class)
public void shouldThroughOnUnassignedStateStoreAccess() {
final String sourceNodeName = "source";
@@ -583,4 +586,32 @@ public class TopologyBuilderTest {
}
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source-1", "topic-foo");
+ builder.addSource("source-2", Pattern.compile("topic-[A-C]"));
+ builder.addSource("source-3", Pattern.compile("topic-\\d"));
+
+ StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
+ Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
+ updatedTopicsField.setAccessible(true);
+
+ Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
+
+ updatedTopics.add("topic-B");
+ updatedTopics.add("topic-3");
+ updatedTopics.add("topic-A");
+
+ builder.updateSubscriptions(subscriptionUpdates, null);
+ builder.setApplicationId("test-id");
+
+ Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
+ assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo"));
+ assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A"));
+ assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B"));
+ assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3"));
+
+ }
}