You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/27 04:09:05 UTC

kafka git commit: KAFKA-2694: Reformat task id as group id and partition id

Repository: kafka
Updated Branches:
  refs/heads/trunk b251bebbc -> e6f9b9e47


KAFKA-2694: Reformat task id as group id and partition id

guozhangwang

* A task id is now a class, ```TaskId```, that has a topic group id and a partition id fields.
* ```TopologyBuilder``` assigns a topic group id to a topic group. Related methods are changed accordingly.
* A state store uses the partition id part of the task id as the change log partition id.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #365 from ymatsuda/task_id


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e6f9b9e4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e6f9b9e4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e6f9b9e4

Branch: refs/heads/trunk
Commit: e6f9b9e473f0396743cdfc5236bfd551316fc6f7
Parents: b251beb
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Oct 26 20:14:19 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Oct 26 20:14:19 2015 -0700

----------------------------------------------------------------------
 .../streams/kstream/SlidingWindowSupplier.java  |  2 +-
 .../processor/DefaultPartitionGrouper.java      | 46 ++++----------
 .../streams/processor/PartitionGrouper.java     | 10 ++-
 .../streams/processor/ProcessorContext.java     |  2 +-
 .../apache/kafka/streams/processor/TaskId.java  | 66 ++++++++++++++++++++
 .../streams/processor/TopologyBuilder.java      | 63 +++++++++++++------
 .../KafkaStreamingPartitionAssignor.java        | 26 ++++----
 .../internals/ProcessorContextImpl.java         |  8 +--
 .../streams/processor/internals/StreamTask.java | 11 ++--
 .../processor/internals/StreamThread.java       | 19 +++---
 .../streams/state/MeteredKeyValueStore.java     |  2 +-
 .../streams/state/RocksDBKeyValueStore.java     |  2 +-
 .../processor/DefaultPartitionGrouperTest.java  | 37 ++++++-----
 .../streams/processor/TopologyBuilderTest.java  | 19 +++++-
 .../processor/internals/StreamTaskTest.java     |  5 +-
 .../processor/internals/StreamThreadTest.java   | 25 ++++----
 .../streams/state/KeyValueStoreTestDriver.java  |  5 +-
 .../apache/kafka/test/MockProcessorContext.java |  5 +-
 .../kafka/test/ProcessorTopologyTestDriver.java | 43 ++++++-------
 19 files changed, 247 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
index 1d53123..0cf969f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
@@ -83,7 +83,7 @@ public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> {
         @Override
         public void init(ProcessorContext context) {
             this.context = context;
-            this.partition = context.id();
+            this.partition = context.id().partition;
             SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
             context.register(this, restoreFunc);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index f87cfa8..7d2188a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -22,47 +22,40 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 
 public class DefaultPartitionGrouper extends PartitionGrouper {
 
-    public Map<Integer, List<TopicPartition>> partitionGroups(Cluster metadata) {
-        Map<Integer, List<TopicPartition>> groups = new HashMap<>();
-        List<List<String>> sortedTopicGroups = sort(topicGroups);
+    public Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata) {
+        Map<TaskId, Set<TopicPartition>> groups = new HashMap<>();
+
+        for (Map.Entry<Integer, Set<String>> entry : topicGroups.entrySet()) {
+            Integer topicGroupId = entry.getKey();
+            Set<String> topicGroup = entry.getValue();
 
-        int taskId = 0;
-        for (List<String> topicGroup : sortedTopicGroups) {
             int maxNumPartitions = maxNumPartitions(metadata, topicGroup);
 
             for (int partitionId = 0; partitionId < maxNumPartitions; partitionId++) {
-                List<TopicPartition> group = new ArrayList<>(topicGroup.size());
+                Set<TopicPartition> group = new HashSet<>(topicGroup.size());
 
                 for (String topic : topicGroup) {
                     if (partitionId < metadata.partitionsForTopic(topic).size()) {
                         group.add(new TopicPartition(topic, partitionId));
                     }
                 }
-                groups.put(taskId++, group);
+                groups.put(new TaskId(topicGroupId, partitionId), Collections.unmodifiableSet(group));
             }
         }
 
-        // make the data unmodifiable, then return
-        Map<Integer, List<TopicPartition>> unmodifiableGroups = new HashMap<>();
-        for (Map.Entry<Integer, List<TopicPartition>> entry : groups.entrySet()) {
-            unmodifiableGroups.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
-        }
-        return Collections.unmodifiableMap(unmodifiableGroups);
+        return Collections.unmodifiableMap(groups);
     }
 
-    protected int maxNumPartitions(Cluster metadata, List<String> topics) {
+    protected int maxNumPartitions(Cluster metadata, Set<String> topics) {
         int maxNumPartitions = 0;
         for (String topic : topics) {
             List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
@@ -77,21 +70,4 @@ public class DefaultPartitionGrouper extends PartitionGrouper {
         return maxNumPartitions;
     }
 
-    protected List<List<String>> sort(Collection<Set<String>> topicGroups) {
-        TreeMap<String, String[]> sortedMap = new TreeMap<>();
-
-        for (Set<String> group : topicGroups) {
-            String[] arr = group.toArray(new String[group.size()]);
-            Arrays.sort(arr);
-            sortedMap.put(arr[0], arr);
-        }
-
-        ArrayList<List<String>> list = new ArrayList(sortedMap.size());
-        for (String[] arr : sortedMap.values()) {
-            list.add(Arrays.asList(arr));
-        }
-
-        return list;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index 82bb36a..026ec89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -21,14 +21,12 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
 
-import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 public abstract class PartitionGrouper {
 
-    protected Collection<Set<String>> topicGroups;
+    protected Map<Integer, Set<String>> topicGroups;
 
     private KafkaStreamingPartitionAssignor partitionAssignor = null;
 
@@ -38,9 +36,9 @@ public abstract class PartitionGrouper {
      * @param metadata
      * @return a map of task ids to groups of partitions
      */
-    public abstract Map<Integer, List<TopicPartition>> partitionGroups(Cluster metadata);
+    public abstract Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata);
 
-    public void topicGroups(Collection<Set<String>> topicGroups) {
+    public void topicGroups(Map<Integer, Set<String>> topicGroups) {
         this.topicGroups = topicGroups;
     }
 
@@ -48,7 +46,7 @@ public abstract class PartitionGrouper {
         this.partitionAssignor = partitionAssignor;
     }
 
-    public Set<Integer> taskIds(TopicPartition partition) {
+    public Set<TaskId> taskIds(TopicPartition partition) {
         return partitionAssignor.taskIds(partition);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index e7cf257..88ac64e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -30,7 +30,7 @@ public interface ProcessorContext {
      *
      * @return the task id
      */
-    int id();
+    TaskId id();
 
     /**
      * Returns the key serializer

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
new file mode 100644
index 0000000..3d474fe
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+public class TaskId {
+
+    public final int topicGroupId;
+    public final int partition;
+
+    public TaskId(int topicGroupId, int partition) {
+        this.topicGroupId = topicGroupId;
+        this.partition = partition;
+    }
+
+    public String toString() {
+        return topicGroupId + "_" + partition;
+    }
+
+    public static TaskId parse(String string) {
+        int index = string.indexOf('_');
+        if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException();
+
+        try {
+            int topicGroupId = Integer.parseInt(string.substring(0, index));
+            int partition = Integer.parseInt(string.substring(index + 1));
+
+            return new TaskId(topicGroupId, partition);
+        } catch (Exception e) {
+            throw new TaskIdFormatException();
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof TaskId) {
+            TaskId other = (TaskId) o;
+            return other.topicGroupId == this.topicGroupId && other.partition == this.partition;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        long n = ((long) topicGroupId << 32) | (long) partition;
+        return (int) (n % 0xFFFFFFFFL);
+    }
+
+    public static class TaskIdFormatException extends RuntimeException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index a475e1e..077489c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.QuickUnion;
@@ -53,9 +54,10 @@ public class TopologyBuilder {
     private final Set<String> nodeNames = new HashSet<>();
     private final Set<String> sourceTopicNames = new HashSet<>();
 
-    private final QuickUnion<String> nodeGroups = new QuickUnion<>();
+    private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
     private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
     private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
+    private Map<Integer, Set<String>> nodeGroups = null;
 
     private interface NodeFactory {
         ProcessorNode build();
@@ -166,7 +168,7 @@ public class TopologyBuilder {
         nodeNames.add(name);
         nodeFactories.add(new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
         nodeToTopics.put(name, topics.clone());
-        nodeGroups.add(name);
+        nodeGrouper.add(name);
 
         return this;
     }
@@ -247,47 +249,72 @@ public class TopologyBuilder {
 
         nodeNames.add(name);
         nodeFactories.add(new ProcessorNodeFactory(name, parentNames, supplier));
-        nodeGroups.add(name);
-        nodeGroups.unite(name, parentNames);
+        nodeGrouper.add(name);
+        nodeGrouper.unite(name, parentNames);
         return this;
     }
 
     /**
-     * Returns the topic groups.
+     * Returns the map of topic groups keyed by the group id.
      * A topic group is a group of topics in the same task.
      *
      * @return groups of topic names
      */
-    public Collection<Set<String>> topicGroups() {
-        List<Set<String>> topicGroups = new ArrayList<>();
+    public Map<Integer, Set<String>> topicGroups() {
+        Map<Integer, Set<String>> topicGroups = new HashMap<>();
 
-        for (Set<String> nodeGroup : generateNodeGroups(nodeGroups)) {
+        if (nodeGroups == null) {
+            nodeGroups = nodeGroups();
+        } else if (!nodeGroups.equals(nodeGroups())) {
+            throw new TopologyException("topology has mutated");
+        }
+
+        for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
             Set<String> topicGroup = new HashSet<>();
-            for (String node : nodeGroup) {
+            for (String node : entry.getValue()) {
                 String[] topics = nodeToTopics.get(node);
                 if (topics != null)
                     topicGroup.addAll(Arrays.asList(topics));
             }
-            topicGroups.add(Collections.unmodifiableSet(topicGroup));
+            topicGroups.put(entry.getKey(), Collections.unmodifiableSet(topicGroup));
         }
 
-        return Collections.unmodifiableList(topicGroups);
+        return Collections.unmodifiableMap(topicGroups);
     }
 
-    private Collection<Set<String>> generateNodeGroups(QuickUnion<String> grouping) {
-        HashMap<String, Set<String>> nodeGroupMap = new HashMap<>();
+    private Map<Integer, Set<String>> nodeGroups() {
+        HashMap<Integer, Set<String>> nodeGroups = new HashMap<>();
+        HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
 
-        for (String nodeName : nodeNames) {
-            String root = grouping.root(nodeName);
-            Set<String> nodeGroup = nodeGroupMap.get(root);
+        int nodeGroupId = 0;
+
+        // Go through source nodes first. This makes the group id assignment easy to predict in tests
+        for (String nodeName : Utils.sorted(nodeToTopics.keySet())) {
+            String root = nodeGrouper.root(nodeName);
+            Set<String> nodeGroup = rootToNodeGroup.get(root);
             if (nodeGroup == null) {
                 nodeGroup = new HashSet<>();
-                nodeGroupMap.put(root, nodeGroup);
+                rootToNodeGroup.put(root, nodeGroup);
+                nodeGroups.put(nodeGroupId++, nodeGroup);
             }
             nodeGroup.add(nodeName);
         }
 
-        return nodeGroupMap.values();
+        // Go through non-source nodes
+        for (String nodeName : Utils.sorted(nodeNames)) {
+            if (!nodeToTopics.containsKey(nodeName)) {
+                String root = nodeGrouper.root(nodeName);
+                Set<String> nodeGroup = rootToNodeGroup.get(root);
+                if (nodeGroup == null) {
+                    nodeGroup = new HashSet<>();
+                    rootToNodeGroup.put(root, nodeGroup);
+                    nodeGroups.put(nodeGroupId++, nodeGroup);
+                }
+                nodeGroup.add(nodeName);
+            }
+        }
+
+        return nodeGroups;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
index ee5bb93..f7b14ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.PartitionGrouper;
+import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +41,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
     private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
 
     private PartitionGrouper partitionGrouper;
-    private Map<TopicPartition, Set<Integer>> partitionToTaskIds;
+    private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
 
     @Override
     public void configure(Map<String, ?> configs) {
@@ -67,29 +68,30 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
 
     @Override
     public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
-        Map<Integer, List<TopicPartition>> partitionGroups = partitionGrouper.partitionGroups(metadata);
+        Map<TaskId, Set<TopicPartition>> partitionGroups = partitionGrouper.partitionGroups(metadata);
 
         String[] clientIds = subscriptions.keySet().toArray(new String[subscriptions.size()]);
-        Integer[] taskIds = partitionGroups.keySet().toArray(new Integer[partitionGroups.size()]);
+        TaskId[] taskIds = partitionGroups.keySet().toArray(new TaskId[partitionGroups.size()]);
 
         Map<String, Assignment> assignment = new HashMap<>();
 
         for (int i = 0; i < clientIds.length; i++) {
             List<TopicPartition> partitions = new ArrayList<>();
-            List<Integer> ids = new ArrayList<>();
+            List<TaskId> ids = new ArrayList<>();
             for (int j = i; j < taskIds.length; j += clientIds.length) {
-                Integer taskId = taskIds[j];
+                TaskId taskId = taskIds[j];
                 for (TopicPartition partition : partitionGroups.get(taskId)) {
                     partitions.add(partition);
                     ids.add(taskId);
                 }
             }
-            ByteBuffer buf = ByteBuffer.allocate(4 + ids.size() * 4);
+            ByteBuffer buf = ByteBuffer.allocate(4 + ids.size() * 8);
             //version
             buf.putInt(1);
             // encode task ids
-            for (Integer id : ids) {
-                buf.putInt(id);
+            for (TaskId id : ids) {
+                buf.putInt(id.topicGroupId);
+                buf.putInt(id.partition);
             }
             buf.rewind();
             assignment.put(clientIds[i], new Assignment(partitions, buf));
@@ -104,19 +106,19 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
         ByteBuffer data = assignment.userData();
         data.rewind();
 
-        Map<TopicPartition, Set<Integer>> partitionToTaskIds = new HashMap<>();
+        Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
 
         // check version
         int version = data.getInt();
         if (version == 1) {
             for (TopicPartition partition : partitions) {
-                Set<Integer> taskIds = partitionToTaskIds.get(partition);
+                Set<TaskId> taskIds = partitionToTaskIds.get(partition);
                 if (taskIds == null) {
                     taskIds = new HashSet<>();
                     partitionToTaskIds.put(partition, taskIds);
                 }
                 // decode a task id
-                taskIds.add(data.getInt());
+                taskIds.add(new TaskId(data.getInt(), data.getInt()));
             }
         } else {
             KafkaException ex = new KafkaException("unknown assignment data version: " + version);
@@ -126,7 +128,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
         this.partitionToTaskIds = partitionToTaskIds;
     }
 
-    public Set<Integer> taskIds(TopicPartition partition) {
+    public Set<TaskId> taskIds(TopicPartition partition) {
         return partitionToTaskIds.get(partition);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index dfc838c..3c1e059 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.TaskId;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +36,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
 
     private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class);
 
-    private final int id;
+    private final TaskId id;
     private final StreamTask task;
     private final StreamingMetrics metrics;
     private final RecordCollector collector;
@@ -49,7 +50,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     private boolean initialized;
 
     @SuppressWarnings("unchecked")
-    public ProcessorContextImpl(int id,
+    public ProcessorContextImpl(TaskId id,
                                 StreamTask task,
                                 StreamingConfig config,
                                 RecordCollector collector,
@@ -78,8 +79,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         this.initialized = true;
     }
 
-    @Override
-    public int id() {
+    public TaskId id() {
         return id;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 1de6f9b..d83d721 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +46,7 @@ public class StreamTask implements Punctuator {
 
     private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
 
-    private final int id;
+    private final TaskId id;
     private final int maxBufferedSize;
 
     private final Consumer consumer;
@@ -78,7 +79,7 @@ public class StreamTask implements Punctuator {
      * @param config                the {@link StreamingConfig} specified by the user
      * @param metrics               the {@link StreamingMetrics} created by the thread
      */
-    public StreamTask(int id,
+    public StreamTask(TaskId id,
                       Consumer<byte[], byte[]> consumer,
                       Producer<byte[], byte[]> producer,
                       Consumer<byte[], byte[]> restoreConsumer,
@@ -116,8 +117,8 @@ public class StreamTask implements Punctuator {
 
         // create the processor state manager
         try {
-            File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), Integer.toString(id));
-            this.stateMgr = new ProcessorStateManager(id, stateFile, restoreConsumer);
+            File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
+            this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer);
         } catch (IOException e) {
             throw new KafkaException("Error while creating the state manager", e);
         }
@@ -138,7 +139,7 @@ public class StreamTask implements Punctuator {
         this.processorContext.initialized();
     }
 
-    public int id() {
+    public TaskId id() {
         return id;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e3803a1..abc5c5d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.PartitionGrouper;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,7 +76,7 @@ public class StreamThread extends Thread {
     protected final Consumer<byte[], byte[]> consumer;
     protected final Consumer<byte[], byte[]> restoreConsumer;
 
-    private final Map<Integer, StreamTask> tasks;
+    private final Map<TaskId, StreamTask> tasks;
     private final String clientId;
     private final Time time;
     private final File stateDir;
@@ -199,7 +200,7 @@ public class StreamThread extends Thread {
         running.set(false);
     }
 
-    public Map<Integer, StreamTask> tasks() {
+    public Map<TaskId, StreamTask> tasks() {
         return Collections.unmodifiableMap(tasks);
     }
 
@@ -375,7 +376,7 @@ public class StreamThread extends Thread {
             if (stateDirs != null) {
                 for (File dir : stateDirs) {
                     try {
-                        int id = Integer.parseInt(dir.getName());
+                        TaskId id = TaskId.parse(dir.getName());
 
                         // try to acquire the exclusive lock on the state directory
                         FileLock directoryLock = null;
@@ -396,7 +397,7 @@ public class StreamThread extends Thread {
                                 }
                             }
                         }
-                    } catch (NumberFormatException e) {
+                    } catch (TaskId.TaskIdFormatException e) {
                         // there may be some unknown files that sits in the same directory,
                         // we should ignore these files instead trying to delete them as well
                     }
@@ -407,7 +408,7 @@ public class StreamThread extends Thread {
         }
     }
 
-    protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+    protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
         sensors.taskCreationSensor.record();
 
         return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, builder.build(), config, sensors);
@@ -415,11 +416,11 @@ public class StreamThread extends Thread {
 
     private void addPartitions(Collection<TopicPartition> assignment) {
 
-        HashMap<Integer, Set<TopicPartition>> partitionsForTask = new HashMap<>();
+        HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
 
         for (TopicPartition partition : assignment) {
-            Set<Integer> taskIds = partitionGrouper.taskIds(partition);
-            for (Integer taskId : taskIds) {
+            Set<TaskId> taskIds = partitionGrouper.taskIds(partition);
+            for (TaskId taskId : taskIds) {
                 Set<TopicPartition> partitions = partitionsForTask.get(taskId);
                 if (partitions == null) {
                     partitions = new HashSet<>();
@@ -430,7 +431,7 @@ public class StreamThread extends Thread {
         }
 
         // create the tasks
-        for (Integer taskId : partitionsForTask.keySet()) {
+        for (TaskId taskId : partitionsForTask.keySet()) {
             try {
                 tasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId)));
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index 779bc75..a7f4c12 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -73,7 +73,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
 
         this.topic = name;
-        this.partition = context.id();
+        this.partition = context.id().partition;
 
         this.context = context;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
index 7393bb1..1de345e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
@@ -81,7 +81,7 @@ public class RocksDBKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
 
         public RocksDBStore(String name, ProcessorContext context, Serdes<K, V> serdes) {
             this.topic = name;
-            this.partition = context.id();
+            this.partition = context.id().partition;
             this.context = context;
             this.serdes = serdes;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
index 388955e..d43fc53 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import org.junit.Test;
 
@@ -29,6 +28,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 
@@ -47,28 +47,35 @@ public class DefaultPartitionGrouperTest {
     @Test
     public void testGrouping() {
         PartitionGrouper grouper = new DefaultPartitionGrouper();
-        int taskId;
-        Map<Integer, List<TopicPartition>> expected;
+        int topicGroupId;
+        Map<TaskId, Set<TopicPartition>> expected;
+        Map<Integer, Set<String>> topicGroups;
 
-        grouper.topicGroups(mkList(mkSet("topic1"), mkSet("topic2")));
+        topicGroups = new HashMap<>();
+        topicGroups.put(0, mkSet("topic1"));
+        topicGroups.put(1, mkSet("topic2"));
+        grouper.topicGroups(topicGroups);
 
         expected = new HashMap<>();
-        taskId = 0;
-        expected.put(taskId++, mkList(new TopicPartition("topic1", 0)));
-        expected.put(taskId++, mkList(new TopicPartition("topic1", 1)));
-        expected.put(taskId++, mkList(new TopicPartition("topic1", 2)));
-        expected.put(taskId++, mkList(new TopicPartition("topic2", 0)));
-        expected.put(taskId,   mkList(new TopicPartition("topic2", 1)));
+        topicGroupId = 0;
+        expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0)));
+        expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1)));
+        expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
+        topicGroupId++;
+        expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic2", 0)));
+        expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic2", 1)));
 
         assertEquals(expected, grouper.partitionGroups(metadata));
 
-        grouper.topicGroups(mkList(mkSet("topic1", "topic2")));
+        topicGroups = new HashMap<>();
+        topicGroups.put(0, mkSet("topic1", "topic2"));
+        grouper.topicGroups(topicGroups);
 
         expected = new HashMap<>();
-        taskId = 0;
-        expected.put(taskId++, mkList(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)));
-        expected.put(taskId++, mkList(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
-        expected.put(taskId,   mkList(new TopicPartition("topic1", 2)));
+        topicGroupId = 0;
+        expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)));
+        expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
+        expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
 
         assertEquals(expected, grouper.partitionGroups(metadata));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 05d24d3..b77c253 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -25,8 +25,10 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class TopologyBuilderTest {
@@ -121,16 +123,29 @@ public class TopologyBuilderTest {
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
 
-        Collection<Set<String>> topicGroups = builder.topicGroups();
+        Map<Integer, Set<String>> topicGroups = builder.topicGroups();
+
+        Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
+        expectedTopicGroups.put(0, set("topic-1", "topic-1x", "topic-2"));
+        expectedTopicGroups.put(1, set("topic-3", "topic-4"));
+        expectedTopicGroups.put(2, set("topic-5"));
 
         assertEquals(3, topicGroups.size());
-        assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2"), mkSet("topic-3", "topic-4"), mkSet("topic-5")), new HashSet<>(topicGroups));
+        assertEquals(expectedTopicGroups, topicGroups);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
         assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
     }
 
+    private <T> Set<T> set(T... items) {
+        Set<T> set = new HashSet<>();
+        for (T item : items) {
+            set.add(item);
+        }
+        return set;
+    }
+
     private <T> List<T> list(T... elems) {
         return Arrays.asList(elems);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 92b8684..0b828f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.test.MockSourceNode;
 import org.junit.Test;
 import org.junit.Before;
@@ -98,7 +99,7 @@ public class StreamTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
+            StreamTask task = new StreamTask(new TaskId(0, 0), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
 
             task.addRecords(partition1, records(
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
@@ -149,7 +150,7 @@ public class StreamTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
+            StreamTask task = new StreamTask(new TaskId(1, 1), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
 
             task.addRecords(partition1, records(
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index cbb2558..d5011a3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.PartitionGrouper;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Test;
@@ -81,11 +82,11 @@ public class StreamThreadTest {
     PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"));
 
     // task0 is unused
-    private final int task1 = 1;
-    private final int task2 = 2;
-    // task3 is unused
-    private final int task4 = 4;
-    private final int task5 = 5;
+    private final TaskId task1 = new TaskId(0, 1);
+    private final TaskId task2 = new TaskId(0, 2);
+    private final TaskId task3 = new TaskId(0, 3);
+    private final TaskId task4 = new TaskId(1, 1);
+    private final TaskId task5 = new TaskId(1, 2);
 
     private Properties configProps() {
         return new Properties() {
@@ -104,7 +105,7 @@ public class StreamThreadTest {
     private static class TestStreamTask extends StreamTask {
         public boolean committed = false;
 
-        public TestStreamTask(int id,
+        public TestStreamTask(TaskId id,
                               Consumer<byte[], byte[]> consumer,
                               Producer<byte[], byte[]> producer,
                               Consumer<byte[], byte[]> restoreConsumer,
@@ -140,7 +141,7 @@ public class StreamThreadTest {
 
         StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), new SystemTime()) {
             @Override
-            protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                 return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
             }
         };
@@ -240,9 +241,9 @@ public class StreamThreadTest {
 
             StreamingConfig config = new StreamingConfig(props);
 
-            File stateDir1 = new File(baseDir, "1");
-            File stateDir2 = new File(baseDir, "2");
-            File stateDir3 = new File(baseDir, "3");
+            File stateDir1 = new File(baseDir, task1.toString());
+            File stateDir2 = new File(baseDir, task2.toString());
+            File stateDir3 = new File(baseDir, task3.toString());
             File extraDir = new File(baseDir, "X");
             stateDir1.mkdir();
             stateDir2.mkdir();
@@ -264,7 +265,7 @@ public class StreamThreadTest {
                 }
 
                 @Override
-                protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+                protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
                 }
             };
@@ -385,7 +386,7 @@ public class StreamThreadTest {
                 }
 
                 @Override
-                protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+                protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
                 }
             };

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 4dfa9c2..7e1512a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.test.MockProcessorContext;
 
@@ -245,8 +246,8 @@ public class KeyValueStoreTestDriver<K, V> {
         this.context = new MockProcessorContext(null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
                 serdes.valueDeserializer(), recordCollector) {
             @Override
-            public int id() {
-                return 1;
+            public TaskId id() {
+                return new TaskId(0, 1);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 16df9c5..40f11a0 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 
 import java.io.File;
@@ -82,8 +83,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public int id() {
-        return 0;
+    public TaskId id() {
+        return new TaskId(0, 0);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f9b9e4/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 8eb2c62..0c4b1a2 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StreamTask;
@@ -59,14 +60,14 @@ import java.util.concurrent.atomic.AtomicLong;
  * and {@link org.apache.kafka.clients.producer.Producer}s that read and write raw {@code byte[]} messages. You can either deal
  * with messages that have {@code byte[]} keys and values, or you can supply the {@link Serializer}s and {@link Deserializer}s
  * that the driver can use to convert the keys and values into objects.
- * 
+ *
  * <h2>Driver setup</h2>
  * <p>
  * In order to create a ProcessorTopologyTestDriver instance, you need a TopologyBuilder and a {@link StreamingConfig}. The
  * configuration needs to be representative of what you'd supply to the real topology, so that means including several key
  * properties. For example, the following code fragment creates a configuration that specifies a local Kafka broker list
  * (which is needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:
- * 
+ *
  * <pre>
  * StringSerializer strSerializer = new StringSerializer();
  * StringDeserializer strDeserializer = new StringDeserializer();
@@ -81,34 +82,34 @@ import java.util.concurrent.atomic.AtomicLong;
  * TopologyBuilder builder = ...
  * ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
  * </pre>
- * 
+ *
  * <h2>Processing messages</h2>
  * <p>
  * Your test can supply new input records on any of the topics that the topology's sources consume. Here's an example of an
  * input message on the topic named {@code input-topic}:
- * 
+ *
  * <pre>
  * driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);
  * </pre>
- * 
+ *
  * Immediately, the driver will pass the input message through to the appropriate source that consumes the named topic,
  * and will invoke the processor(s) downstream of the source. If your topology's processors forward messages to sinks,
  * your test can then consume these output messages to verify they match the expected outcome. For example, if our topology
  * should have generated 2 messages on {@code output-topic-1} and 1 message on {@code output-topic-2}, then our test can
  * obtain these messages using the {@link #readOutput(String, Deserializer, Deserializer)} method:
- * 
+ *
  * <pre>
  * ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
  * ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
  * ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
  * </pre>
- * 
+ *
  * Again, our example topology generates messages with string keys and values, so we supply our string deserializer instance
  * for use on both the keys and values. Your test logic can then verify whether these output records are correct.
  * <p>
  * Finally, when completed, make sure your tests {@link #close()} the driver to release all resources and
  * {@link org.apache.kafka.streams.processor.Processor}s.
- * 
+ *
  * <h2>Processor state</h2>
  * <p>
  * Some processors use Kafka {@link StateStore state storage}, so this driver class provides the {@link #getStateStore(String)}
@@ -122,7 +123,7 @@ public class ProcessorTopologyTestDriver {
 
     private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
 
-    private final int id;
+    private final TaskId id;
     private final ProcessorTopology topology;
     private final StreamTask task;
     private final MockConsumer<byte[], byte[]> consumer;
@@ -139,7 +140,7 @@ public class ProcessorTopologyTestDriver {
      * @param storeNames the optional names of the state stores that are used by the topology
      */
     public ProcessorTopologyTestDriver(StreamingConfig config, TopologyBuilder builder, String... storeNames) {
-        id = 0;
+        id = new TaskId(0, 0);
         topology = builder.build();
 
         // Set up the consumer and producer ...
@@ -177,7 +178,7 @@ public class ProcessorTopologyTestDriver {
 
     /**
      * Send an input message with the given key and value on the specified topic to the topology, and then commit the messages.
-     * 
+     *
      * @param topicName the name of the topic on which the message is to be sent
      * @param key the raw message key
      * @param value the raw message value
@@ -207,7 +208,7 @@ public class ProcessorTopologyTestDriver {
 
     /**
      * Send an input message with the given key and value on the specified topic to the topology.
-     * 
+     *
      * @param topicName the name of the topic on which the message is to be sent
      * @param key the raw message key
      * @param value the raw message value
@@ -221,7 +222,7 @@ public class ProcessorTopologyTestDriver {
     /**
      * Read the next record from the given topic. These records were output by the topology during the previous calls to
      * {@link #process(String, byte[], byte[])}.
-     * 
+     *
      * @param topic the name of the topic
      * @return the next record on that topic, or null if there is no record available
      */
@@ -234,7 +235,7 @@ public class ProcessorTopologyTestDriver {
     /**
      * Read the next record from the given topic. These records were output by the topology during the previous calls to
      * {@link #process(String, byte[], byte[])}.
-     * 
+     *
      * @param topic the name of the topic
      * @param keyDeserializer the deserializer for the key type
      * @param valueDeserializer the deserializer for the value type
@@ -259,7 +260,7 @@ public class ProcessorTopologyTestDriver {
      * <p>
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
      * {@link #process(String, byte[], byte[]) process an input message}, and/or to check the store afterward.
-     * 
+     *
      * @param name the name of the store
      * @return the state store, or null if no store has been registered with the given name
      * @see #getKeyValueStore(String)
@@ -276,7 +277,7 @@ public class ProcessorTopologyTestDriver {
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
      * {@link #process(String, byte[], byte[]) process an input message}, and/or to check the store afterward.
      * <p>
-     * 
+     *
      * @param name the name of the store
      * @return the key value store, or null if no {@link KeyValueStore} has been registered with the given name
      * @see #getStateStore(String)
@@ -297,12 +298,12 @@ public class ProcessorTopologyTestDriver {
     /**
      * Utility method that creates the {@link MockConsumer} used for restoring state, which should not be done by this
      * driver object unless this method is overwritten with a functional consumer.
-     * 
+     *
      * @param id the ID of the stream task
      * @param storeNames the names of the stores that this
      * @return the mock consumer; never null
      */
-    protected MockConsumer<byte[], byte[]> createRestoreConsumer(int id, String... storeNames) {
+    protected MockConsumer<byte[], byte[]> createRestoreConsumer(TaskId id, String... storeNames) {
         MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
             @Override
             public synchronized void seekToEnd(TopicPartition... partitions) {
@@ -327,10 +328,10 @@ public class ProcessorTopologyTestDriver {
             // consumer.subscribe(new TopicPartition(topicName, 1));
             // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
             List<PartitionInfo> partitionInfos = new ArrayList<>();
-            partitionInfos.add(new PartitionInfo(topicName, id, null, null, null));
+            partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null));
             consumer.updatePartitions(topicName, partitionInfos);
-            consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id), 0L));
+            consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L));
         }
         return consumer;
     }
-}
\ No newline at end of file
+}