You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/15 02:09:36 UTC

kafka git commit: MINOR: add internal source topic for tracking

Repository: kafka
Updated Branches:
  refs/heads/trunk 4f22705c7 -> a3d3d5379


MINOR: add internal source topic for tracking

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Mastuda

Closes #775 from guozhangwang/KRepartTopic


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a3d3d537
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a3d3d537
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a3d3d537

Branch: refs/heads/trunk
Commit: a3d3d5379df71e7a2c653d06ebf1b30923dde738
Parents: 4f22705
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Jan 14 17:09:33 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jan 14 17:09:33 2016 -0800

----------------------------------------------------------------------
 .../streams/kstream/internals/KTableImpl.java   |  5 ++-
 .../streams/processor/TopologyBuilder.java      | 44 ++++++++++++++++----
 .../KafkaStreamingPartitionAssignor.java        | 35 +++++++++++-----
 .../streams/processor/TopologyBuilderTest.java  | 13 +++---
 4 files changed, 71 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a3d3d537/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 7f30f59..9888dff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -46,6 +46,8 @@ import java.util.Set;
  */
 public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
 
+    private static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
+
     private static final String FILTER_NAME = "KTABLE-FILTER-";
 
     private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
@@ -258,7 +260,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
         String aggregateName = topology.newName(AGGREGATE_NAME);
 
-        String topic = name + "-repartition";
+        String topic = name + REPARTITION_TOPIC_SUFFIX;
 
         ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
         ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
@@ -278,6 +280,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         this.enableSendingOldValues();
 
         // send the aggregate key-value pairs to the intermediate topic for partitioning
+        topology.addInternalTopic(topic);
         topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName);
 
         // read the intermediate topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/a3d3d537/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 9cd80a4..d6b63d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.QuickUnion;
 import org.apache.kafka.streams.processor.internals.SinkNode;
@@ -58,6 +59,8 @@ public class TopologyBuilder {
 
     private final Set<String> sourceTopicNames = new HashSet<>();
 
+    private final Set<String> internalTopicNames = new HashSet<>();
+
     private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
     private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
     private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
@@ -152,18 +155,20 @@ public class TopologyBuilder {
 
     public static class TopicsInfo {
         public Set<String> sourceTopics;
-        public Set<String> stateNames;
+        public Set<String> interSourceTopics;
+        public Set<String> stateChangelogTopics;
 
-        public TopicsInfo(Set<String> sourceTopics, Set<String> stateNames) {
+        public TopicsInfo(Set<String> sourceTopics, Set<String> interSourceTopics, Set<String> stateChangelogTopics) {
             this.sourceTopics = sourceTopics;
-            this.stateNames = stateNames;
+            this.interSourceTopics = interSourceTopics;
+            this.stateChangelogTopics = stateChangelogTopics;
         }
 
         @Override
         public boolean equals(Object o) {
             if (o instanceof TopicsInfo) {
                 TopicsInfo other = (TopicsInfo) o;
-                return other.sourceTopics.equals(this.sourceTopics) && other.stateNames.equals(this.stateNames);
+                return other.sourceTopics.equals(this.sourceTopics) && other.stateChangelogTopics.equals(this.stateChangelogTopics);
             } else {
                 return false;
             }
@@ -171,7 +176,7 @@ public class TopologyBuilder {
 
         @Override
         public int hashCode() {
-            long n = ((long) sourceTopics.hashCode() << 32) | (long) stateNames.hashCode();
+            long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode();
             return (int) (n % 0xFFFFFFFFL);
         }
     }
@@ -424,6 +429,18 @@ public class TopologyBuilder {
         return this;
     }
 
+    /**
+     * Adds an internal topic
+     *
+     * @param topicName the name of the topic
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public final TopologyBuilder addInternalTopic(String topicName) {
+        this.internalTopicNames.add(topicName);
+
+        return this;
+    }
+
     private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
         if (!stateFactories.containsKey(stateStoreName))
             throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
@@ -460,24 +477,33 @@ public class TopologyBuilder {
 
         for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
             Set<String> sourceTopics = new HashSet<>();
-            Set<String> stateNames = new HashSet<>();
+            Set<String> internalSourceTopics = new HashSet<>();
+            Set<String> stateChangelogTopics = new HashSet<>();
             for (String node : entry.getValue()) {
                 // if the node is a source node, add to the source topics
                 String[] topics = nodeToTopics.get(node);
-                if (topics != null)
+                if (topics != null) {
                     sourceTopics.addAll(Arrays.asList(topics));
 
+                    // if some of the topics are internal, add them to the internal topics
+                    for (String topic : topics) {
+                        if (this.internalTopicNames.contains(topic))
+                            internalSourceTopics.add(topic);
+                    }
+                }
+
                 // if the node is connected to a state, add to the state topics
                 for (StateStoreFactory stateFactory : stateFactories.values()) {
 
                     if (stateFactory.isInternal && stateFactory.users.contains(node)) {
-                        stateNames.add(stateFactory.supplier.name());
+                        stateChangelogTopics.add(stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
                     }
                 }
             }
             topicGroups.put(entry.getKey(), new TopicsInfo(
                     Collections.unmodifiableSet(sourceTopics),
-                    Collections.unmodifiableSet(stateNames)));
+                    Collections.unmodifiableSet(internalSourceTopics),
+                    Collections.unmodifiableSet(stateChangelogTopics)));
         }
 
         return Collections.unmodifiableMap(topicGroups);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a3d3d537/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
index 29c67f2..2734f56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
@@ -66,10 +66,10 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
     private int numStandbyReplicas;
     private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
     private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
-    private Map<String, Set<TaskId>> stateNameToTaskIds;
+    private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds;
+    private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
     private Map<TaskId, Set<TopicPartition>> standbyTasks;
 
-
     // TODO: the following ZK dependency should be removed after KIP-4
     private static final String ZK_TOPIC_PATH = "/brokers/topics";
     private static final String ZK_BROKER_PATH = "/brokers/ids";
@@ -296,13 +296,24 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
         Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata);
 
         // add tasks to state topic subscribers
-        stateNameToTaskIds = new HashMap<>();
+        stateChangelogTopicToTaskIds = new HashMap<>();
+        internalSourceTopicToTaskIds = new HashMap<>();
         for (TaskId task : partitionsForTask.keySet()) {
-            for (String stateName : topicGroups.get(task.topicGroupId).stateNames) {
-                Set<TaskId> tasks = stateNameToTaskIds.get(stateName);
+            for (String stateName : topicGroups.get(task.topicGroupId).stateChangelogTopics) {
+                Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(stateName);
                 if (tasks == null) {
                     tasks = new HashSet<>();
-                    stateNameToTaskIds.put(stateName, tasks);
+                    stateChangelogTopicToTaskIds.put(stateName, tasks);
+                }
+
+                tasks.add(task);
+            }
+
+            for (String topicName : topicGroups.get(task.topicGroupId).interSourceTopics) {
+                Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topicName);
+                if (tasks == null) {
+                    tasks = new HashSet<>();
+                    internalSourceTopicToTaskIds.put(topicName, tasks);
                 }
 
                 tasks.add(task);
@@ -363,12 +374,16 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
             }
         }
 
-        // if ZK is specified, get the tasks for each state topic and validate the topic partitions
+        // if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions
         if (zkClient != null) {
             log.debug("Starting to validate changelog topics in partition assignor.");
 
-            for (Map.Entry<String, Set<TaskId>> entry : stateNameToTaskIds.entrySet()) {
-                String topic = streamThread.jobId + "-" + entry.getKey() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
+            Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
+            topicToTaskIds.putAll(stateChangelogTopicToTaskIds);
+            topicToTaskIds.putAll(internalSourceTopicToTaskIds);
+
+            for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
+                String topic = streamThread.jobId + "-" + entry.getKey();
 
                 // the expected number of partitions is the max value of TaskId.partition + 1
                 int numPartitions = 0;
@@ -455,7 +470,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
 
     /* For Test Only */
     public Set<TaskId> tasksForState(String stateName) {
-        return stateNameToTaskIds.get(stateName);
+        return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
     }
 
     public Set<TaskId> tasksForPartition(TopicPartition partition) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a3d3d537/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index af0b3c9..a2f6ec0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -213,9 +214,9 @@ public class TopologyBuilderTest {
         Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet()));
-        expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet()));
-        expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet()));
+        expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
+        expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), Collections.<String>emptySet()));
+        expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet(), Collections.<String>emptySet()));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -251,9 +252,9 @@ public class TopologyBuilderTest {
         Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), mkSet("store-1")));
-        expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), mkSet("store-2")));
-        expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), mkSet("store-3")));
+        expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet("store-1" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
+        expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), mkSet("store-2" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
+        expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet(), mkSet("store-3" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);