You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/27 04:09:05 UTC
kafka git commit: KAFKA-2694: Reformat task id as group id and
partition id
Repository: kafka
Updated Branches:
refs/heads/trunk b251bebbc -> e6f9b9e47
KAFKA-2694: Reformat task id as group id and partition id
guozhangwang
* A task id is now a class, ```TaskId```, that has a topic group id and a partition id fields.
* ```TopologyBuilder``` assigns a topic group id to a topic group. Related methods are changed accordingly.
* A state store uses the partition id part of the task id as the change log partition id.
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #365 from ymatsuda/task_id
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e6f9b9e4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e6f9b9e4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e6f9b9e4
Branch: refs/heads/trunk
Commit: e6f9b9e473f0396743cdfc5236bfd551316fc6f7
Parents: b251beb
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Oct 26 20:14:19 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Oct 26 20:14:19 2015 -0700
----------------------------------------------------------------------
.../streams/kstream/SlidingWindowSupplier.java | 2 +-
.../processor/DefaultPartitionGrouper.java | 46 ++++----------
.../streams/processor/PartitionGrouper.java | 10 ++-
.../streams/processor/ProcessorContext.java | 2 +-
.../apache/kafka/streams/processor/TaskId.java | 66 ++++++++++++++++++++
.../streams/processor/TopologyBuilder.java | 63 +++++++++++++------
.../KafkaStreamingPartitionAssignor.java | 26 ++++----
.../internals/ProcessorContextImpl.java | 8 +--
.../streams/processor/internals/StreamTask.java | 11 ++--
.../processor/internals/StreamThread.java | 19 +++---
.../streams/state/MeteredKeyValueStore.java | 2 +-
.../streams/state/RocksDBKeyValueStore.java | 2 +-
.../processor/DefaultPartitionGrouperTest.java | 37 ++++++-----
.../streams/processor/TopologyBuilderTest.java | 19 +++++-
.../processor/internals/StreamTaskTest.java | 5 +-
.../processor/internals/StreamThreadTest.java | 25 ++++----
.../streams/state/KeyValueStoreTestDriver.java | 5 +-
.../apache/kafka/test/MockProcessorContext.java | 5 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 43 ++++++-------
19 files changed, 247 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
index 1d53123..0cf969f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
@@ -83,7 +83,7 @@ public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> {
@Override
public void init(ProcessorContext context) {
this.context = context;
- this.partition = context.id();
+ this.partition = context.id().partition;
SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
context.register(this, restoreFunc);
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index f87cfa8..7d2188a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -22,47 +22,40 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
public class DefaultPartitionGrouper extends PartitionGrouper {
- public Map<Integer, List<TopicPartition>> partitionGroups(Cluster metadata) {
- Map<Integer, List<TopicPartition>> groups = new HashMap<>();
- List<List<String>> sortedTopicGroups = sort(topicGroups);
+ public Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata) {
+ Map<TaskId, Set<TopicPartition>> groups = new HashMap<>();
+
+ for (Map.Entry<Integer, Set<String>> entry : topicGroups.entrySet()) {
+ Integer topicGroupId = entry.getKey();
+ Set<String> topicGroup = entry.getValue();
- int taskId = 0;
- for (List<String> topicGroup : sortedTopicGroups) {
int maxNumPartitions = maxNumPartitions(metadata, topicGroup);
for (int partitionId = 0; partitionId < maxNumPartitions; partitionId++) {
- List<TopicPartition> group = new ArrayList<>(topicGroup.size());
+ Set<TopicPartition> group = new HashSet<>(topicGroup.size());
for (String topic : topicGroup) {
if (partitionId < metadata.partitionsForTopic(topic).size()) {
group.add(new TopicPartition(topic, partitionId));
}
}
- groups.put(taskId++, group);
+ groups.put(new TaskId(topicGroupId, partitionId), Collections.unmodifiableSet(group));
}
}
- // make the data unmodifiable, then return
- Map<Integer, List<TopicPartition>> unmodifiableGroups = new HashMap<>();
- for (Map.Entry<Integer, List<TopicPartition>> entry : groups.entrySet()) {
- unmodifiableGroups.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
- }
- return Collections.unmodifiableMap(unmodifiableGroups);
+ return Collections.unmodifiableMap(groups);
}
- protected int maxNumPartitions(Cluster metadata, List<String> topics) {
+ protected int maxNumPartitions(Cluster metadata, Set<String> topics) {
int maxNumPartitions = 0;
for (String topic : topics) {
List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
@@ -77,21 +70,4 @@ public class DefaultPartitionGrouper extends PartitionGrouper {
return maxNumPartitions;
}
- protected List<List<String>> sort(Collection<Set<String>> topicGroups) {
- TreeMap<String, String[]> sortedMap = new TreeMap<>();
-
- for (Set<String> group : topicGroups) {
- String[] arr = group.toArray(new String[group.size()]);
- Arrays.sort(arr);
- sortedMap.put(arr[0], arr);
- }
-
- ArrayList<List<String>> list = new ArrayList(sortedMap.size());
- for (String[] arr : sortedMap.values()) {
- list.add(Arrays.asList(arr));
- }
-
- return list;
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index 82bb36a..026ec89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -21,14 +21,12 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
-import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Set;
public abstract class PartitionGrouper {
- protected Collection<Set<String>> topicGroups;
+ protected Map<Integer, Set<String>> topicGroups;
private KafkaStreamingPartitionAssignor partitionAssignor = null;
@@ -38,9 +36,9 @@ public abstract class PartitionGrouper {
* @param metadata
* @return a map of task ids to groups of partitions
*/
- public abstract Map<Integer, List<TopicPartition>> partitionGroups(Cluster metadata);
+ public abstract Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata);
- public void topicGroups(Collection<Set<String>> topicGroups) {
+ public void topicGroups(Map<Integer, Set<String>> topicGroups) {
this.topicGroups = topicGroups;
}
@@ -48,7 +46,7 @@ public abstract class PartitionGrouper {
this.partitionAssignor = partitionAssignor;
}
- public Set<Integer> taskIds(TopicPartition partition) {
+ public Set<TaskId> taskIds(TopicPartition partition) {
return partitionAssignor.taskIds(partition);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index e7cf257..88ac64e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -30,7 +30,7 @@ public interface ProcessorContext {
*
* @return the task id
*/
- int id();
+ TaskId id();
/**
* Returns the key serializer
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
new file mode 100644
index 0000000..3d474fe
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+public class TaskId {
+
+ public final int topicGroupId;
+ public final int partition;
+
+ public TaskId(int topicGroupId, int partition) {
+ this.topicGroupId = topicGroupId;
+ this.partition = partition;
+ }
+
+ public String toString() {
+ return topicGroupId + "_" + partition;
+ }
+
+ public static TaskId parse(String string) {
+ int index = string.indexOf('_');
+ if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException();
+
+ try {
+ int topicGroupId = Integer.parseInt(string.substring(0, index));
+ int partition = Integer.parseInt(string.substring(index + 1));
+
+ return new TaskId(topicGroupId, partition);
+ } catch (Exception e) {
+ throw new TaskIdFormatException();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof TaskId) {
+ TaskId other = (TaskId) o;
+ return other.topicGroupId == this.topicGroupId && other.partition == this.partition;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ long n = ((long) topicGroupId << 32) | (long) partition;
+ return (int) (n % 0xFFFFFFFFL);
+ }
+
+ public static class TaskIdFormatException extends RuntimeException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index a475e1e..077489c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.QuickUnion;
@@ -53,9 +54,10 @@ public class TopologyBuilder {
private final Set<String> nodeNames = new HashSet<>();
private final Set<String> sourceTopicNames = new HashSet<>();
- private final QuickUnion<String> nodeGroups = new QuickUnion<>();
+ private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
+ private Map<Integer, Set<String>> nodeGroups = null;
private interface NodeFactory {
ProcessorNode build();
@@ -166,7 +168,7 @@ public class TopologyBuilder {
nodeNames.add(name);
nodeFactories.add(new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
nodeToTopics.put(name, topics.clone());
- nodeGroups.add(name);
+ nodeGrouper.add(name);
return this;
}
@@ -247,47 +249,72 @@ public class TopologyBuilder {
nodeNames.add(name);
nodeFactories.add(new ProcessorNodeFactory(name, parentNames, supplier));
- nodeGroups.add(name);
- nodeGroups.unite(name, parentNames);
+ nodeGrouper.add(name);
+ nodeGrouper.unite(name, parentNames);
return this;
}
/**
- * Returns the topic groups.
+ * Returns the map of topic groups keyed by the group id.
* A topic group is a group of topics in the same task.
*
* @return groups of topic names
*/
- public Collection<Set<String>> topicGroups() {
- List<Set<String>> topicGroups = new ArrayList<>();
+ public Map<Integer, Set<String>> topicGroups() {
+ Map<Integer, Set<String>> topicGroups = new HashMap<>();
- for (Set<String> nodeGroup : generateNodeGroups(nodeGroups)) {
+ if (nodeGroups == null) {
+ nodeGroups = nodeGroups();
+ } else if (!nodeGroups.equals(nodeGroups())) {
+ throw new TopologyException("topology has mutated");
+ }
+
+ for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
Set<String> topicGroup = new HashSet<>();
- for (String node : nodeGroup) {
+ for (String node : entry.getValue()) {
String[] topics = nodeToTopics.get(node);
if (topics != null)
topicGroup.addAll(Arrays.asList(topics));
}
- topicGroups.add(Collections.unmodifiableSet(topicGroup));
+ topicGroups.put(entry.getKey(), Collections.unmodifiableSet(topicGroup));
}
- return Collections.unmodifiableList(topicGroups);
+ return Collections.unmodifiableMap(topicGroups);
}
- private Collection<Set<String>> generateNodeGroups(QuickUnion<String> grouping) {
- HashMap<String, Set<String>> nodeGroupMap = new HashMap<>();
+ private Map<Integer, Set<String>> nodeGroups() {
+ HashMap<Integer, Set<String>> nodeGroups = new HashMap<>();
+ HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
- for (String nodeName : nodeNames) {
- String root = grouping.root(nodeName);
- Set<String> nodeGroup = nodeGroupMap.get(root);
+ int nodeGroupId = 0;
+
+ // Go through source nodes first. This makes the group id assignment easy to predict in tests
+ for (String nodeName : Utils.sorted(nodeToTopics.keySet())) {
+ String root = nodeGrouper.root(nodeName);
+ Set<String> nodeGroup = rootToNodeGroup.get(root);
if (nodeGroup == null) {
nodeGroup = new HashSet<>();
- nodeGroupMap.put(root, nodeGroup);
+ rootToNodeGroup.put(root, nodeGroup);
+ nodeGroups.put(nodeGroupId++, nodeGroup);
}
nodeGroup.add(nodeName);
}
- return nodeGroupMap.values();
+ // Go through non-source nodes
+ for (String nodeName : Utils.sorted(nodeNames)) {
+ if (!nodeToTopics.containsKey(nodeName)) {
+ String root = nodeGrouper.root(nodeName);
+ Set<String> nodeGroup = rootToNodeGroup.get(root);
+ if (nodeGroup == null) {
+ nodeGroup = new HashSet<>();
+ rootToNodeGroup.put(root, nodeGroup);
+ nodeGroups.put(nodeGroupId++, nodeGroup);
+ }
+ nodeGroup.add(nodeName);
+ }
+ }
+
+ return nodeGroups;
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
index ee5bb93..f7b14ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.PartitionGrouper;
+import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
private PartitionGrouper partitionGrouper;
- private Map<TopicPartition, Set<Integer>> partitionToTaskIds;
+ private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
@Override
public void configure(Map<String, ?> configs) {
@@ -67,29 +68,30 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
@Override
public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
- Map<Integer, List<TopicPartition>> partitionGroups = partitionGrouper.partitionGroups(metadata);
+ Map<TaskId, Set<TopicPartition>> partitionGroups = partitionGrouper.partitionGroups(metadata);
String[] clientIds = subscriptions.keySet().toArray(new String[subscriptions.size()]);
- Integer[] taskIds = partitionGroups.keySet().toArray(new Integer[partitionGroups.size()]);
+ TaskId[] taskIds = partitionGroups.keySet().toArray(new TaskId[partitionGroups.size()]);
Map<String, Assignment> assignment = new HashMap<>();
for (int i = 0; i < clientIds.length; i++) {
List<TopicPartition> partitions = new ArrayList<>();
- List<Integer> ids = new ArrayList<>();
+ List<TaskId> ids = new ArrayList<>();
for (int j = i; j < taskIds.length; j += clientIds.length) {
- Integer taskId = taskIds[j];
+ TaskId taskId = taskIds[j];
for (TopicPartition partition : partitionGroups.get(taskId)) {
partitions.add(partition);
ids.add(taskId);
}
}
- ByteBuffer buf = ByteBuffer.allocate(4 + ids.size() * 4);
+ ByteBuffer buf = ByteBuffer.allocate(4 + ids.size() * 8);
//version
buf.putInt(1);
// encode task ids
- for (Integer id : ids) {
- buf.putInt(id);
+ for (TaskId id : ids) {
+ buf.putInt(id.topicGroupId);
+ buf.putInt(id.partition);
}
buf.rewind();
assignment.put(clientIds[i], new Assignment(partitions, buf));
@@ -104,19 +106,19 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
ByteBuffer data = assignment.userData();
data.rewind();
- Map<TopicPartition, Set<Integer>> partitionToTaskIds = new HashMap<>();
+ Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
// check version
int version = data.getInt();
if (version == 1) {
for (TopicPartition partition : partitions) {
- Set<Integer> taskIds = partitionToTaskIds.get(partition);
+ Set<TaskId> taskIds = partitionToTaskIds.get(partition);
if (taskIds == null) {
taskIds = new HashSet<>();
partitionToTaskIds.put(partition, taskIds);
}
// decode a task id
- taskIds.add(data.getInt());
+ taskIds.add(new TaskId(data.getInt(), data.getInt()));
}
} else {
KafkaException ex = new KafkaException("unknown assignment data version: " + version);
@@ -126,7 +128,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
this.partitionToTaskIds = partitionToTaskIds;
}
- public Set<Integer> taskIds(TopicPartition partition) {
+ public Set<TaskId> taskIds(TopicPartition partition) {
return partitionToTaskIds.get(partition);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index dfc838c..3c1e059 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +36,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class);
- private final int id;
+ private final TaskId id;
private final StreamTask task;
private final StreamingMetrics metrics;
private final RecordCollector collector;
@@ -49,7 +50,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
private boolean initialized;
@SuppressWarnings("unchecked")
- public ProcessorContextImpl(int id,
+ public ProcessorContextImpl(TaskId id,
StreamTask task,
StreamingConfig config,
RecordCollector collector,
@@ -78,8 +79,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
this.initialized = true;
}
- @Override
- public int id() {
+ public TaskId id() {
return id;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 1de6f9b..d83d721 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +46,7 @@ public class StreamTask implements Punctuator {
private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
- private final int id;
+ private final TaskId id;
private final int maxBufferedSize;
private final Consumer consumer;
@@ -78,7 +79,7 @@ public class StreamTask implements Punctuator {
* @param config the {@link StreamingConfig} specified by the user
* @param metrics the {@link StreamingMetrics} created by the thread
*/
- public StreamTask(int id,
+ public StreamTask(TaskId id,
Consumer<byte[], byte[]> consumer,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> restoreConsumer,
@@ -116,8 +117,8 @@ public class StreamTask implements Punctuator {
// create the processor state manager
try {
- File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), Integer.toString(id));
- this.stateMgr = new ProcessorStateManager(id, stateFile, restoreConsumer);
+ File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
+ this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer);
} catch (IOException e) {
throw new KafkaException("Error while creating the state manager", e);
}
@@ -138,7 +139,7 @@ public class StreamTask implements Punctuator {
this.processorContext.initialized();
}
- public int id() {
+ public TaskId id() {
return id;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e3803a1..abc5c5d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.PartitionGrouper;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +76,7 @@ public class StreamThread extends Thread {
protected final Consumer<byte[], byte[]> consumer;
protected final Consumer<byte[], byte[]> restoreConsumer;
- private final Map<Integer, StreamTask> tasks;
+ private final Map<TaskId, StreamTask> tasks;
private final String clientId;
private final Time time;
private final File stateDir;
@@ -199,7 +200,7 @@ public class StreamThread extends Thread {
running.set(false);
}
- public Map<Integer, StreamTask> tasks() {
+ public Map<TaskId, StreamTask> tasks() {
return Collections.unmodifiableMap(tasks);
}
@@ -375,7 +376,7 @@ public class StreamThread extends Thread {
if (stateDirs != null) {
for (File dir : stateDirs) {
try {
- int id = Integer.parseInt(dir.getName());
+ TaskId id = TaskId.parse(dir.getName());
// try to acquire the exclusive lock on the state directory
FileLock directoryLock = null;
@@ -396,7 +397,7 @@ public class StreamThread extends Thread {
}
}
}
- } catch (NumberFormatException e) {
+ } catch (TaskId.TaskIdFormatException e) {
// there may be some unknown files that sits in the same directory,
// we should ignore these files instead trying to delete them as well
}
@@ -407,7 +408,7 @@ public class StreamThread extends Thread {
}
}
- protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+ protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
sensors.taskCreationSensor.record();
return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, builder.build(), config, sensors);
@@ -415,11 +416,11 @@ public class StreamThread extends Thread {
private void addPartitions(Collection<TopicPartition> assignment) {
- HashMap<Integer, Set<TopicPartition>> partitionsForTask = new HashMap<>();
+ HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
for (TopicPartition partition : assignment) {
- Set<Integer> taskIds = partitionGrouper.taskIds(partition);
- for (Integer taskId : taskIds) {
+ Set<TaskId> taskIds = partitionGrouper.taskIds(partition);
+ for (TaskId taskId : taskIds) {
Set<TopicPartition> partitions = partitionsForTask.get(taskId);
if (partitions == null) {
partitions = new HashSet<>();
@@ -430,7 +431,7 @@ public class StreamThread extends Thread {
}
// create the tasks
- for (Integer taskId : partitionsForTask.keySet()) {
+ for (TaskId taskId : partitionsForTask.keySet()) {
try {
tasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId)));
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index 779bc75..a7f4c12 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -73,7 +73,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
this.topic = name;
- this.partition = context.id();
+ this.partition = context.id().partition;
this.context = context;
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
index 7393bb1..1de345e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
@@ -81,7 +81,7 @@ public class RocksDBKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
public RocksDBStore(String name, ProcessorContext context, Serdes<K, V> serdes) {
this.topic = name;
- this.partition = context.id();
+ this.partition = context.id().partition;
this.context = context;
this.serdes = serdes;
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
index 388955e..d43fc53 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import static org.apache.kafka.common.utils.Utils.mkList;
import static org.apache.kafka.common.utils.Utils.mkSet;
import org.junit.Test;
@@ -29,6 +28,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -47,28 +47,35 @@ public class DefaultPartitionGrouperTest {
@Test
public void testGrouping() {
PartitionGrouper grouper = new DefaultPartitionGrouper();
- int taskId;
- Map<Integer, List<TopicPartition>> expected;
+ int topicGroupId;
+ Map<TaskId, Set<TopicPartition>> expected;
+ Map<Integer, Set<String>> topicGroups;
- grouper.topicGroups(mkList(mkSet("topic1"), mkSet("topic2")));
+ topicGroups = new HashMap<>();
+ topicGroups.put(0, mkSet("topic1"));
+ topicGroups.put(1, mkSet("topic2"));
+ grouper.topicGroups(topicGroups);
expected = new HashMap<>();
- taskId = 0;
- expected.put(taskId++, mkList(new TopicPartition("topic1", 0)));
- expected.put(taskId++, mkList(new TopicPartition("topic1", 1)));
- expected.put(taskId++, mkList(new TopicPartition("topic1", 2)));
- expected.put(taskId++, mkList(new TopicPartition("topic2", 0)));
- expected.put(taskId, mkList(new TopicPartition("topic2", 1)));
+ topicGroupId = 0;
+ expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0)));
+ expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1)));
+ expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
+ topicGroupId++;
+ expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic2", 0)));
+ expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic2", 1)));
assertEquals(expected, grouper.partitionGroups(metadata));
- grouper.topicGroups(mkList(mkSet("topic1", "topic2")));
+ topicGroups = new HashMap<>();
+ topicGroups.put(0, mkSet("topic1", "topic2"));
+ grouper.topicGroups(topicGroups);
expected = new HashMap<>();
- taskId = 0;
- expected.put(taskId++, mkList(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)));
- expected.put(taskId++, mkList(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
- expected.put(taskId, mkList(new TopicPartition("topic1", 2)));
+ topicGroupId = 0;
+ expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)));
+ expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
+ expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
assertEquals(expected, grouper.partitionGroups(metadata));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 05d24d3..b77c253 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -25,8 +25,10 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class TopologyBuilderTest {
@@ -121,16 +123,29 @@ public class TopologyBuilderTest {
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
- Collection<Set<String>> topicGroups = builder.topicGroups();
+ Map<Integer, Set<String>> topicGroups = builder.topicGroups();
+
+ Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
+ expectedTopicGroups.put(0, set("topic-1", "topic-1x", "topic-2"));
+ expectedTopicGroups.put(1, set("topic-3", "topic-4"));
+ expectedTopicGroups.put(2, set("topic-5"));
assertEquals(3, topicGroups.size());
- assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2"), mkSet("topic-3", "topic-4"), mkSet("topic-5")), new HashSet<>(topicGroups));
+ assertEquals(expectedTopicGroups, topicGroups);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
}
+ private <T> Set<T> set(T... items) {
+ Set<T> set = new HashSet<>();
+ for (T item : items) {
+ set.add(item);
+ }
+ return set;
+ }
+
private <T> List<T> list(T... elems) {
return Arrays.asList(elems);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 92b8684..0b828f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.test.MockSourceNode;
import org.junit.Test;
import org.junit.Before;
@@ -98,7 +99,7 @@ public class StreamTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
- StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
+ StreamTask task = new StreamTask(new TaskId(0, 0), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
@@ -149,7 +150,7 @@ public class StreamTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
- StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
+ StreamTask task = new StreamTask(new TaskId(1, 1), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index cbb2558..d5011a3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.PartitionGrouper;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Test;
@@ -81,11 +82,11 @@ public class StreamThreadTest {
PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"));
// task0 is unused
- private final int task1 = 1;
- private final int task2 = 2;
- // task3 is unused
- private final int task4 = 4;
- private final int task5 = 5;
+ private final TaskId task1 = new TaskId(0, 1);
+ private final TaskId task2 = new TaskId(0, 2);
+ private final TaskId task3 = new TaskId(0, 3);
+ private final TaskId task4 = new TaskId(1, 1);
+ private final TaskId task5 = new TaskId(1, 2);
private Properties configProps() {
return new Properties() {
@@ -104,7 +105,7 @@ public class StreamThreadTest {
private static class TestStreamTask extends StreamTask {
public boolean committed = false;
- public TestStreamTask(int id,
+ public TestStreamTask(TaskId id,
Consumer<byte[], byte[]> consumer,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> restoreConsumer,
@@ -140,7 +141,7 @@ public class StreamThreadTest {
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), new SystemTime()) {
@Override
- protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+ protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
}
};
@@ -240,9 +241,9 @@ public class StreamThreadTest {
StreamingConfig config = new StreamingConfig(props);
- File stateDir1 = new File(baseDir, "1");
- File stateDir2 = new File(baseDir, "2");
- File stateDir3 = new File(baseDir, "3");
+ File stateDir1 = new File(baseDir, task1.toString());
+ File stateDir2 = new File(baseDir, task2.toString());
+ File stateDir3 = new File(baseDir, task3.toString());
File extraDir = new File(baseDir, "X");
stateDir1.mkdir();
stateDir2.mkdir();
@@ -264,7 +265,7 @@ public class StreamThreadTest {
}
@Override
- protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+ protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
}
};
@@ -385,7 +386,7 @@ public class StreamThreadTest {
}
@Override
- protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+ protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
}
};
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 4dfa9c2..7e1512a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.test.MockProcessorContext;
@@ -245,8 +246,8 @@ public class KeyValueStoreTestDriver<K, V> {
this.context = new MockProcessorContext(null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
serdes.valueDeserializer(), recordCollector) {
@Override
- public int id() {
- return 1;
+ public TaskId id() {
+ return new TaskId(0, 1);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 16df9c5..40f11a0 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import java.io.File;
@@ -82,8 +83,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
}
@Override
- public int id() {
- return 0;
+ public TaskId id() {
+ return new TaskId(0, 0);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 8eb2c62..0c4b1a2 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StreamTask;
@@ -59,14 +60,14 @@ import java.util.concurrent.atomic.AtomicLong;
* and {@link org.apache.kafka.clients.producer.Producer}s that read and write raw {@code byte[]} messages. You can either deal
* with messages that have {@code byte[]} keys and values, or you can supply the {@link Serializer}s and {@link Deserializer}s
* that the driver can use to convert the keys and values into objects.
- *
+ *
* <h2>Driver setup</h2>
* <p>
* In order to create a ProcessorTopologyTestDriver instance, you need a TopologyBuilder and a {@link StreamingConfig}. The
* configuration needs to be representative of what you'd supply to the real topology, so that means including several key
* properties. For example, the following code fragment creates a configuration that specifies a local Kafka broker list
* (which is needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:
- *
+ *
* <pre>
* StringSerializer strSerializer = new StringSerializer();
* StringDeserializer strDeserializer = new StringDeserializer();
@@ -81,34 +82,34 @@ import java.util.concurrent.atomic.AtomicLong;
* TopologyBuilder builder = ...
* ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
* </pre>
- *
+ *
* <h2>Processing messages</h2>
* <p>
* Your test can supply new input records on any of the topics that the topology's sources consume. Here's an example of an
* input message on the topic named {@code input-topic}:
- *
+ *
* <pre>
* driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);
* </pre>
- *
+ *
* Immediately, the driver will pass the input message through to the appropriate source that consumes the named topic,
* and will invoke the processor(s) downstream of the source. If your topology's processors forward messages to sinks,
* your test can then consume these output messages to verify they match the expected outcome. For example, if our topology
* should have generated 2 messages on {@code output-topic-1} and 1 message on {@code output-topic-2}, then our test can
* obtain these messages using the {@link #readOutput(String, Deserializer, Deserializer)} method:
- *
+ *
* <pre>
* ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
* ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
* ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
* </pre>
- *
+ *
* Again, our example topology generates messages with string keys and values, so we supply our string deserializer instance
* for use on both the keys and values. Your test logic can then verify whether these output records are correct.
* <p>
* Finally, when completed, make sure your tests {@link #close()} the driver to release all resources and
* {@link org.apache.kafka.streams.processor.Processor}s.
- *
+ *
* <h2>Processor state</h2>
* <p>
* Some processors use Kafka {@link StateStore state storage}, so this driver class provides the {@link #getStateStore(String)}
@@ -122,7 +123,7 @@ public class ProcessorTopologyTestDriver {
private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
- private final int id;
+ private final TaskId id;
private final ProcessorTopology topology;
private final StreamTask task;
private final MockConsumer<byte[], byte[]> consumer;
@@ -139,7 +140,7 @@ public class ProcessorTopologyTestDriver {
* @param storeNames the optional names of the state stores that are used by the topology
*/
public ProcessorTopologyTestDriver(StreamingConfig config, TopologyBuilder builder, String... storeNames) {
- id = 0;
+ id = new TaskId(0, 0);
topology = builder.build();
// Set up the consumer and producer ...
@@ -177,7 +178,7 @@ public class ProcessorTopologyTestDriver {
/**
* Send an input message with the given key and value on the specified topic to the topology, and then commit the messages.
- *
+ *
* @param topicName the name of the topic on which the message is to be sent
* @param key the raw message key
* @param value the raw message value
@@ -207,7 +208,7 @@ public class ProcessorTopologyTestDriver {
/**
* Send an input message with the given key and value on the specified topic to the topology.
- *
+ *
* @param topicName the name of the topic on which the message is to be sent
* @param key the raw message key
* @param value the raw message value
@@ -221,7 +222,7 @@ public class ProcessorTopologyTestDriver {
/**
* Read the next record from the given topic. These records were output by the topology during the previous calls to
* {@link #process(String, byte[], byte[])}.
- *
+ *
* @param topic the name of the topic
* @return the next record on that topic, or null if there is no record available
*/
@@ -234,7 +235,7 @@ public class ProcessorTopologyTestDriver {
/**
* Read the next record from the given topic. These records were output by the topology during the previous calls to
* {@link #process(String, byte[], byte[])}.
- *
+ *
* @param topic the name of the topic
* @param keyDeserializer the deserializer for the key type
* @param valueDeserializer the deserializer for the value type
@@ -259,7 +260,7 @@ public class ProcessorTopologyTestDriver {
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #process(String, byte[], byte[]) process an input message}, and/or to check the store afterward.
- *
+ *
* @param name the name of the store
* @return the state store, or null if no store has been registered with the given name
* @see #getKeyValueStore(String)
@@ -276,7 +277,7 @@ public class ProcessorTopologyTestDriver {
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #process(String, byte[], byte[]) process an input message}, and/or to check the store afterward.
* <p>
- *
+ *
* @param name the name of the store
* @return the key value store, or null if no {@link KeyValueStore} has been registered with the given name
* @see #getStateStore(String)
@@ -297,12 +298,12 @@ public class ProcessorTopologyTestDriver {
/**
* Utility method that creates the {@link MockConsumer} used for restoring state, which should not be done by this
* driver object unless this method is overwritten with a functional consumer.
- *
+ *
* @param id the ID of the stream task
* @param storeNames the names of the stores that this
* @return the mock consumer; never null
*/
- protected MockConsumer<byte[], byte[]> createRestoreConsumer(int id, String... storeNames) {
+ protected MockConsumer<byte[], byte[]> createRestoreConsumer(TaskId id, String... storeNames) {
MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
@Override
public synchronized void seekToEnd(TopicPartition... partitions) {
@@ -327,10 +328,10 @@ public class ProcessorTopologyTestDriver {
// consumer.subscribe(new TopicPartition(topicName, 1));
// Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
List<PartitionInfo> partitionInfos = new ArrayList<>();
- partitionInfos.add(new PartitionInfo(topicName, id, null, null, null));
+ partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null));
consumer.updatePartitions(topicName, partitionInfos);
- consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id), 0L));
+ consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L));
}
return consumer;
}
-}
\ No newline at end of file
+}