You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/15 02:09:36 UTC
kafka git commit: MINOR: add internal source topic for tracking
Repository: kafka
Updated Branches:
refs/heads/trunk 4f22705c7 -> a3d3d5379
MINOR: add internal source topic for tracking
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Mastuda
Closes #775 from guozhangwang/KRepartTopic
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a3d3d537
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a3d3d537
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a3d3d537
Branch: refs/heads/trunk
Commit: a3d3d5379df71e7a2c653d06ebf1b30923dde738
Parents: 4f22705
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Jan 14 17:09:33 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jan 14 17:09:33 2016 -0800
----------------------------------------------------------------------
.../streams/kstream/internals/KTableImpl.java | 5 ++-
.../streams/processor/TopologyBuilder.java | 44 ++++++++++++++++----
.../KafkaStreamingPartitionAssignor.java | 35 +++++++++++-----
.../streams/processor/TopologyBuilderTest.java | 13 +++---
4 files changed, 71 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a3d3d537/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 7f30f59..9888dff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -46,6 +46,8 @@ import java.util.Set;
*/
public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
+ private static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
+
private static final String FILTER_NAME = "KTABLE-FILTER-";
private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
@@ -258,7 +260,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
String aggregateName = topology.newName(AGGREGATE_NAME);
- String topic = name + "-repartition";
+ String topic = name + REPARTITION_TOPIC_SUFFIX;
ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
@@ -278,6 +280,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
this.enableSendingOldValues();
// send the aggregate key-value pairs to the intermediate topic for partitioning
+ topology.addInternalTopic(topic);
topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName);
// read the intermediate topic
http://git-wip-us.apache.org/repos/asf/kafka/blob/a3d3d537/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 9cd80a4..d6b63d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.QuickUnion;
import org.apache.kafka.streams.processor.internals.SinkNode;
@@ -58,6 +59,8 @@ public class TopologyBuilder {
private final Set<String> sourceTopicNames = new HashSet<>();
+ private final Set<String> internalTopicNames = new HashSet<>();
+
private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
@@ -152,18 +155,20 @@ public class TopologyBuilder {
public static class TopicsInfo {
public Set<String> sourceTopics;
- public Set<String> stateNames;
+ public Set<String> interSourceTopics;
+ public Set<String> stateChangelogTopics;
- public TopicsInfo(Set<String> sourceTopics, Set<String> stateNames) {
+ public TopicsInfo(Set<String> sourceTopics, Set<String> interSourceTopics, Set<String> stateChangelogTopics) {
this.sourceTopics = sourceTopics;
- this.stateNames = stateNames;
+ this.interSourceTopics = interSourceTopics;
+ this.stateChangelogTopics = stateChangelogTopics;
}
@Override
public boolean equals(Object o) {
if (o instanceof TopicsInfo) {
TopicsInfo other = (TopicsInfo) o;
- return other.sourceTopics.equals(this.sourceTopics) && other.stateNames.equals(this.stateNames);
+ return other.sourceTopics.equals(this.sourceTopics) && other.stateChangelogTopics.equals(this.stateChangelogTopics);
} else {
return false;
}
@@ -171,7 +176,7 @@ public class TopologyBuilder {
@Override
public int hashCode() {
- long n = ((long) sourceTopics.hashCode() << 32) | (long) stateNames.hashCode();
+ long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode();
return (int) (n % 0xFFFFFFFFL);
}
}
@@ -424,6 +429,18 @@ public class TopologyBuilder {
return this;
}
+ /**
+ * Adds an internal topic
+ *
+ * @param topicName the name of the topic
+ * @return this builder instance so methods can be chained together; never null
+ */
+ public final TopologyBuilder addInternalTopic(String topicName) {
+ this.internalTopicNames.add(topicName);
+
+ return this;
+ }
+
private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
if (!stateFactories.containsKey(stateStoreName))
throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
@@ -460,24 +477,33 @@ public class TopologyBuilder {
for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
Set<String> sourceTopics = new HashSet<>();
- Set<String> stateNames = new HashSet<>();
+ Set<String> internalSourceTopics = new HashSet<>();
+ Set<String> stateChangelogTopics = new HashSet<>();
for (String node : entry.getValue()) {
// if the node is a source node, add to the source topics
String[] topics = nodeToTopics.get(node);
- if (topics != null)
+ if (topics != null) {
sourceTopics.addAll(Arrays.asList(topics));
+ // if some of the topics are internal, add them to the internal topics
+ for (String topic : topics) {
+ if (this.internalTopicNames.contains(topic))
+ internalSourceTopics.add(topic);
+ }
+ }
+
// if the node is connected to a state, add to the state topics
for (StateStoreFactory stateFactory : stateFactories.values()) {
if (stateFactory.isInternal && stateFactory.users.contains(node)) {
- stateNames.add(stateFactory.supplier.name());
+ stateChangelogTopics.add(stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
}
}
}
topicGroups.put(entry.getKey(), new TopicsInfo(
Collections.unmodifiableSet(sourceTopics),
- Collections.unmodifiableSet(stateNames)));
+ Collections.unmodifiableSet(internalSourceTopics),
+ Collections.unmodifiableSet(stateChangelogTopics)));
}
return Collections.unmodifiableMap(topicGroups);
http://git-wip-us.apache.org/repos/asf/kafka/blob/a3d3d537/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
index 29c67f2..2734f56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
@@ -66,10 +66,10 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
private int numStandbyReplicas;
private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
- private Map<String, Set<TaskId>> stateNameToTaskIds;
+ private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds;
+ private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
private Map<TaskId, Set<TopicPartition>> standbyTasks;
-
// TODO: the following ZK dependency should be removed after KIP-4
private static final String ZK_TOPIC_PATH = "/brokers/topics";
private static final String ZK_BROKER_PATH = "/brokers/ids";
@@ -296,13 +296,24 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata);
// add tasks to state topic subscribers
- stateNameToTaskIds = new HashMap<>();
+ stateChangelogTopicToTaskIds = new HashMap<>();
+ internalSourceTopicToTaskIds = new HashMap<>();
for (TaskId task : partitionsForTask.keySet()) {
- for (String stateName : topicGroups.get(task.topicGroupId).stateNames) {
- Set<TaskId> tasks = stateNameToTaskIds.get(stateName);
+ for (String stateName : topicGroups.get(task.topicGroupId).stateChangelogTopics) {
+ Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(stateName);
if (tasks == null) {
tasks = new HashSet<>();
- stateNameToTaskIds.put(stateName, tasks);
+ stateChangelogTopicToTaskIds.put(stateName, tasks);
+ }
+
+ tasks.add(task);
+ }
+
+ for (String topicName : topicGroups.get(task.topicGroupId).interSourceTopics) {
+ Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topicName);
+ if (tasks == null) {
+ tasks = new HashSet<>();
+ internalSourceTopicToTaskIds.put(topicName, tasks);
}
tasks.add(task);
@@ -363,12 +374,16 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
}
}
- // if ZK is specified, get the tasks for each state topic and validate the topic partitions
+ // if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions
if (zkClient != null) {
log.debug("Starting to validate changelog topics in partition assignor.");
- for (Map.Entry<String, Set<TaskId>> entry : stateNameToTaskIds.entrySet()) {
- String topic = streamThread.jobId + "-" + entry.getKey() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
+ Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
+ topicToTaskIds.putAll(stateChangelogTopicToTaskIds);
+ topicToTaskIds.putAll(internalSourceTopicToTaskIds);
+
+ for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
+ String topic = streamThread.jobId + "-" + entry.getKey();
// the expected number of partitions is the max value of TaskId.partition + 1
int numPartitions = 0;
@@ -455,7 +470,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
/* For Test Only */
public Set<TaskId> tasksForState(String stateName) {
- return stateNameToTaskIds.get(stateName);
+ return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
}
public Set<TaskId> tasksForPartition(TopicPartition partition) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a3d3d537/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index af0b3c9..a2f6ec0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -213,9 +214,9 @@ public class TopologyBuilderTest {
Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
- expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet()));
- expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet()));
- expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet()));
+ expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
+ expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), Collections.<String>emptySet()));
+ expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet(), Collections.<String>emptySet()));
assertEquals(3, topicGroups.size());
assertEquals(expectedTopicGroups, topicGroups);
@@ -251,9 +252,9 @@ public class TopologyBuilderTest {
Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
- expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), mkSet("store-1")));
- expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), mkSet("store-2")));
- expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), mkSet("store-3")));
+ expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet("store-1" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
+ expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), mkSet("store-2" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
+ expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet(), mkSet("store-3" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
assertEquals(3, topicGroups.size());
assertEquals(expectedTopicGroups, topicGroups);