You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/29 17:48:32 UTC

[1/2] kafka git commit: KAFKA-4117: Stream partitionassignro cleanup

Repository: kafka
Updated Branches:
  refs/heads/trunk 0dd9607f9 -> a4ab9d02a


http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index e46a016..cd9b7a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -90,13 +90,13 @@ public class StreamPartitionAssignorTest {
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
     private final TaskId task3 = new TaskId(0, 3);
-    private String userEndPoint = null;
+    private String userEndPoint = "localhost:2171";
 
     private Properties configProps() {
         return new Properties() {
             {
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test");
-                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint);
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
             }
@@ -297,12 +297,12 @@ public class StreamPartitionAssignorTest {
         TaskId task10 = new TaskId(1, 0);
         TaskId task11 = new TaskId(1, 1);
         TaskId task12 = new TaskId(1, 2);
+        List<TaskId> tasks = Utils.mkList(task00, task01, task02, task10, task11, task12);
 
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-
         StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
@@ -323,14 +323,43 @@ public class StreamPartitionAssignorTest {
         assertEquals(2, assignments.get("consumer11").partitions().size());
         assertEquals(2, assignments.get("consumer20").partitions().size());
 
-        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size());
-        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size());
-        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size());
+        AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+
+        assertEquals(2, info10.activeTasks.size());
+        assertEquals(2, info11.activeTasks.size());
+        assertEquals(2, info20.activeTasks.size());
+
+        Set<TaskId> allTasks = new HashSet<>();
+        allTasks.addAll(info10.activeTasks);
+        allTasks.addAll(info11.activeTasks);
+        allTasks.addAll(info20.activeTasks);
+        assertEquals(new HashSet<>(tasks), allTasks);
 
         // check tasks for state topics
-        assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1"));
-        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2"));
-        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3"));
+        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = thread10.builder.topicGroups();
+
+        assertEquals(Utils.mkSet(task00, task01, task02), tasksForState(applicationId, "store1", tasks, topicGroups));
+        assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store2", tasks, topicGroups));
+        assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store3", tasks, topicGroups));
+    }
+
+    private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId> tasks, Map<Integer, TopologyBuilder.TopicsInfo> topicGroups) {
+        final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
+
+        Set<TaskId> ids = new HashSet<>();
+        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+            Set<String> stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
+
+            if (stateChangelogTopics.contains(changelogTopic)) {
+                for (TaskId id : tasks) {
+                    if (id.topicGroupId == entry.getKey())
+                        ids.add(id);
+                }
+            }
+        }
+        return ids;
     }
 
     @Test
@@ -406,48 +435,6 @@ public class StreamPartitionAssignorTest {
         assertEquals(allTasks, allStandbyTasks);
     }
 
-    private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
-
-        // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
-
-        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
-
-        // check if the number of assigned partitions == the size of active task id list
-        assertEquals(assignment.partitions().size(), info.activeTasks.size());
-
-        // check if active tasks are consistent
-        List<TaskId> activeTasks = new ArrayList<>();
-        Set<String> activeTopics = new HashSet<>();
-        for (TopicPartition partition : assignment.partitions()) {
-            // since default grouper, taskid.partition == partition.partition()
-            activeTasks.add(new TaskId(0, partition.partition()));
-            activeTopics.add(partition.topic());
-        }
-        assertEquals(activeTasks, info.activeTasks);
-
-        // check if active partitions cover all topics
-        assertEquals(allTopics, activeTopics);
-
-        // check if standby tasks are consistent
-        Set<String> standbyTopics = new HashSet<>();
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
-            TaskId id = entry.getKey();
-            Set<TopicPartition> partitions = entry.getValue();
-            for (TopicPartition partition : partitions) {
-                // since default grouper, taskid.partition == partition.partition()
-                assertEquals(id.partition, partition.partition());
-
-                standbyTopics.add(partition.topic());
-            }
-        }
-
-        if (info.standbyTasks.size() > 0)
-            // check if standby partitions cover all topics
-            assertEquals(allTopics, standbyTopics);
-
-        return info;
-    }
-
     @Test
     public void testOnAssignment() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
@@ -468,16 +455,18 @@ public class StreamPartitionAssignorTest {
         partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
 
         List<TaskId> activeTaskList = Utils.mkList(task0, task3);
+        Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-        standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
-        standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
+        activeTasks.put(task0, Utils.mkSet(t1p0));
+        activeTasks.put(task3, Utils.mkSet(t2p3));
+        standbyTasks.put(task1, Utils.mkSet(t1p0));
+        standbyTasks.put(task2, Utils.mkSet(t2p0));
 
         AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
         PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
         partitionAssignor.onAssignment(assignment);
 
-        assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0));
-        assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3));
+        assertEquals(activeTasks, partitionAssignor.activeTasks());
         assertEquals(standbyTasks, partitionAssignor.standbyTasks());
     }
 
@@ -621,7 +610,7 @@ public class StreamPartitionAssignorTest {
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
         final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
         final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
-        final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHostState.get(new HostInfo("localhost", 8080));
+        final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
         assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
                 new TopicPartition("topic1", 1),
                 new TopicPartition("topic1", 2)), topicPartitions);
@@ -725,13 +714,54 @@ public class StreamPartitionAssignorTest {
 
     }
 
+    private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
+
+        // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
+
+        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+        // check if the number of assigned partitions == the size of active task id list
+        assertEquals(assignment.partitions().size(), info.activeTasks.size());
+
+        // check if active tasks are consistent
+        List<TaskId> activeTasks = new ArrayList<>();
+        Set<String> activeTopics = new HashSet<>();
+        for (TopicPartition partition : assignment.partitions()) {
+            // since default grouper, taskid.partition == partition.partition()
+            activeTasks.add(new TaskId(0, partition.partition()));
+            activeTopics.add(partition.topic());
+        }
+        assertEquals(activeTasks, info.activeTasks);
+
+        // check if active partitions cover all topics
+        assertEquals(allTopics, activeTopics);
+
+        // check if standby tasks are consistent
+        Set<String> standbyTopics = new HashSet<>();
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
+            TaskId id = entry.getKey();
+            Set<TopicPartition> partitions = entry.getValue();
+            for (TopicPartition partition : partitions) {
+                // since default grouper, taskid.partition == partition.partition()
+                assertEquals(id.partition, partition.partition());
+
+                standbyTopics.add(partition.topic());
+            }
+        }
+
+        if (info.standbyTasks.size() > 0)
+            // check if standby partitions cover all topics
+            assertEquals(allTopics, standbyTopics);
+
+        return info;
+    }
 
     private class MockInternalTopicManager extends InternalTopicManager {
 
-        public Map<String, Integer> readyTopics = new HashMap<>();
-        public MockConsumer<byte[], byte[]> restoreConsumer;
+        Map<String, Integer> readyTopics = new HashMap<>();
+        MockConsumer<byte[], byte[]> restoreConsumer;
 
-        public MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer) {
+        MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer) {
             super();
 
             this.restoreConsumer = restoreConsumer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 411e02d..f4772f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -109,7 +109,7 @@ public class StreamsMetadataStateTest {
                 new PartitionInfo("topic-three", 0, null, null, null),
                 new PartitionInfo("topic-four", 0, null, null, null));
 
-        cluster = new Cluster(Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet());
+        cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet());
         discovery = new StreamsMetadataState(builder);
         discovery.onChange(hostToPartitions, cluster);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index ce94a23..cfa0e61 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -64,7 +64,7 @@ public class AssignmentInfoTest {
         final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion));
         assertEquals(oldVersion.activeTasks, decoded.activeTasks);
         assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
-        assertEquals(0, decoded.partitionsByHostState.size()); // should be empty as wasn't in V1
+        assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in V1
         assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
index 4333087..52ca0a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
@@ -33,23 +33,34 @@ import static org.junit.Assert.assertTrue;
 
 public class TaskAssignorTest {
 
+    private static Map<Integer, ClientState<Integer>> copyStates(Map<Integer, ClientState<Integer>> states) {
+        Map<Integer, ClientState<Integer>> copy = new HashMap<>();
+        for (Map.Entry<Integer, ClientState<Integer>> entry : states.entrySet()) {
+            copy.put(entry.getKey(), entry.getValue().copy());
+        }
+
+        return copy;
+    }
+
     @Test
     public void testAssignWithoutStandby() {
-        HashMap<Integer, ClientState<Integer>> states = new HashMap<>();
+        HashMap<Integer, ClientState<Integer>> statesWithNoPrevTasks = new HashMap<>();
         for (int i = 0; i < 6; i++) {
-            states.put(i, new ClientState<Integer>(1d));
+            statesWithNoPrevTasks.put(i, new ClientState<Integer>(1d));
         }
         Set<Integer> tasks;
-        Map<Integer, ClientState<Integer>> assignments;
         int numActiveTasks;
         int numAssignedTasks;
 
+        Map<Integer, ClientState<Integer>> states;
+
         // # of clients and # of tasks are equal.
+        states = copyStates(statesWithNoPrevTasks);
         tasks = mkSet(0, 1, 2, 3, 4, 5);
-        assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
+        TaskAssignor.assign(states, tasks, 0);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertEquals(1, assignment.activeTasks.size());
@@ -60,10 +71,11 @@ public class TaskAssignorTest {
 
         // # of clients < # of tasks
         tasks = mkSet(0, 1, 2, 3, 4, 5, 6, 7);
-        assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 0);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(1 <= assignment.activeTasks.size());
@@ -76,10 +88,11 @@ public class TaskAssignorTest {
 
         // # of clients > # of tasks
         tasks = mkSet(0, 1, 2, 3);
-        assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 0);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(0 <= assignment.activeTasks.size());
@@ -93,12 +106,12 @@ public class TaskAssignorTest {
 
     @Test
     public void testAssignWithStandby() {
-        HashMap<Integer, ClientState<Integer>> states = new HashMap<>();
+        HashMap<Integer, ClientState<Integer>> statesWithNoPrevTasks = new HashMap<>();
         for (int i = 0; i < 6; i++) {
-            states.put(i, new ClientState<Integer>(1d));
+            statesWithNoPrevTasks.put(i, new ClientState<Integer>(1d));
         }
         Set<Integer> tasks;
-        Map<Integer, ClientState<Integer>> assignments;
+        Map<Integer, ClientState<Integer>> states;
         int numActiveTasks;
         int numAssignedTasks;
 
@@ -108,8 +121,9 @@ public class TaskAssignorTest {
         // 1 standby replicas.
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
-        for (ClientState<Integer> assignment : assignments.values()) {
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 1);
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertEquals(1, assignment.activeTasks.size());
@@ -122,10 +136,11 @@ public class TaskAssignorTest {
         tasks = mkSet(0, 1, 2, 3, 4, 5, 6, 7);
 
         // 1 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 1);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(1 <= assignment.activeTasks.size());
@@ -140,10 +155,11 @@ public class TaskAssignorTest {
         tasks = mkSet(0, 1, 2, 3);
 
         // 1 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 1);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(0 <= assignment.activeTasks.size());
@@ -158,10 +174,11 @@ public class TaskAssignorTest {
         tasks = mkSet(0, 1);
 
         // 1 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 1);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(0 <= assignment.activeTasks.size());
@@ -173,10 +190,11 @@ public class TaskAssignorTest {
         assertEquals(tasks.size() * 2, numAssignedTasks);
 
         // 2 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 2, "TaskAssignorTest-TestAssignWithStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 2);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(0 <= assignment.activeTasks.size());
@@ -187,10 +205,11 @@ public class TaskAssignorTest {
         assertEquals(tasks.size() * 3, numAssignedTasks);
 
         // 3 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 3, "TaskAssignorTest-TestAssignWithStandby");
+        states = copyStates(statesWithNoPrevTasks);
+        TaskAssignor.assign(states, tasks, 3);
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        for (ClientState<Integer> assignment : assignments.values()) {
+        for (ClientState<Integer> assignment : states.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
             assertTrue(0 <= assignment.activeTasks.size());
@@ -205,27 +224,29 @@ public class TaskAssignorTest {
     @Test
     public void testStickiness() {
         List<Integer> tasks;
-        Map<Integer, ClientState<Integer>> states;
+        Map<Integer, ClientState<Integer>> statesWithPrevTasks;
         Map<Integer, ClientState<Integer>> assignments;
         int i;
 
         // # of clients and # of tasks are equal.
+        Map<Integer, ClientState<Integer>> states;
         tasks = mkList(0, 1, 2, 3, 4, 5);
         Collections.shuffle(tasks);
-        states = new HashMap<>();
+        statesWithPrevTasks = new HashMap<>();
         i = 0;
         for (int task : tasks) {
             ClientState<Integer> state = new ClientState<>(1d);
             state.prevActiveTasks.add(task);
             state.prevAssignedTasks.add(task);
-            states.put(i++, state);
+            statesWithPrevTasks.put(i++, state);
         }
-        assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5), 0, "TaskAssignorTest-TestStickiness");
+        states = copyStates(statesWithPrevTasks);
+        TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5), 0);
         for (int client : states.keySet()) {
-            Set<Integer> oldActive = states.get(client).prevActiveTasks;
-            Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
-            Set<Integer> newActive = assignments.get(client).activeTasks;
-            Set<Integer> newAssigned = assignments.get(client).assignedTasks;
+            Set<Integer> oldActive = statesWithPrevTasks.get(client).prevActiveTasks;
+            Set<Integer> oldAssigned = statesWithPrevTasks.get(client).prevAssignedTasks;
+            Set<Integer> newActive = states.get(client).activeTasks;
+            Set<Integer> newAssigned = states.get(client).assignedTasks;
 
             assertEquals(oldActive, newActive);
             assertEquals(oldAssigned, newAssigned);
@@ -234,7 +255,7 @@ public class TaskAssignorTest {
         // # of clients > # of tasks
         tasks = mkList(0, 1, 2, 3, -1, -1);
         Collections.shuffle(tasks);
-        states = new HashMap<>();
+        statesWithPrevTasks = new HashMap<>();
         i = 0;
         for (int task : tasks) {
             ClientState<Integer> state = new ClientState<>(1d);
@@ -242,14 +263,15 @@ public class TaskAssignorTest {
                 state.prevActiveTasks.add(task);
                 state.prevAssignedTasks.add(task);
             }
-            states.put(i++, state);
+            statesWithPrevTasks.put(i++, state);
         }
-        assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3), 0, "TaskAssignorTest-TestStickiness");
+        states = copyStates(statesWithPrevTasks);
+        TaskAssignor.assign(states, mkSet(0, 1, 2, 3), 0);
         for (int client : states.keySet()) {
-            Set<Integer> oldActive = states.get(client).prevActiveTasks;
-            Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
-            Set<Integer> newActive = assignments.get(client).activeTasks;
-            Set<Integer> newAssigned = assignments.get(client).assignedTasks;
+            Set<Integer> oldActive = statesWithPrevTasks.get(client).prevActiveTasks;
+            Set<Integer> oldAssigned = statesWithPrevTasks.get(client).prevAssignedTasks;
+            Set<Integer> newActive = states.get(client).activeTasks;
+            Set<Integer> newAssigned = states.get(client).assignedTasks;
 
             assertEquals(oldActive, newActive);
             assertEquals(oldAssigned, newAssigned);
@@ -258,20 +280,21 @@ public class TaskAssignorTest {
         // # of clients < # of tasks
         List<Set<Integer>> taskSets = mkList(mkSet(0, 1), mkSet(2, 3), mkSet(4, 5), mkSet(6, 7), mkSet(8, 9), mkSet(10, 11));
         Collections.shuffle(taskSets);
-        states = new HashMap<>();
+        statesWithPrevTasks = new HashMap<>();
         i = 0;
         for (Set<Integer> taskSet : taskSets) {
             ClientState<Integer> state = new ClientState<>(1d);
             state.prevActiveTasks.addAll(taskSet);
             state.prevAssignedTasks.addAll(taskSet);
-            states.put(i++, state);
+            statesWithPrevTasks.put(i++, state);
         }
-        assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), 0, "TaskAssignorTest-TestStickiness");
+        states = copyStates(statesWithPrevTasks);
+        TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), 0);
         for (int client : states.keySet()) {
-            Set<Integer> oldActive = states.get(client).prevActiveTasks;
-            Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
-            Set<Integer> newActive = assignments.get(client).activeTasks;
-            Set<Integer> newAssigned = assignments.get(client).assignedTasks;
+            Set<Integer> oldActive = statesWithPrevTasks.get(client).prevActiveTasks;
+            Set<Integer> oldAssigned = statesWithPrevTasks.get(client).prevAssignedTasks;
+            Set<Integer> newActive = states.get(client).activeTasks;
+            Set<Integer> newAssigned = states.get(client).assignedTasks;
 
             Set<Integer> intersection = new HashSet<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 7675f9b..4e8a497 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -66,8 +66,10 @@ public class StoreChangeLoggerTest {
         }
     };
 
+    @SuppressWarnings("unchecked")
     @Test
-    public void testAddRemove() {
+    public void testAddRemove() throws Exception {
+
         context.setTime(1);
         written.put(0, "zero");
         changeLogger.add(0);
@@ -75,26 +77,27 @@ public class StoreChangeLoggerTest {
         changeLogger.add(1);
         written.put(2, "two");
         changeLogger.add(2);
-        assertEquals(3, changeLogger.numDirty());
-        assertEquals(0, changeLogger.numRemoved());
+
+        assertEquals(3, changeLogger.dirty.size());
+        assertEquals(0, changeLogger.removed.size());
 
         changeLogger.delete(0);
         changeLogger.delete(1);
         written.put(3, "three");
         changeLogger.add(3);
-        assertEquals(2, changeLogger.numDirty());
-        assertEquals(2, changeLogger.numRemoved());
+        assertEquals(2, changeLogger.dirty.size());
+        assertEquals(2, changeLogger.removed.size());
 
         written.put(0, "zero-again");
         changeLogger.add(0);
-        assertEquals(3, changeLogger.numDirty());
-        assertEquals(1, changeLogger.numRemoved());
+        assertEquals(3, changeLogger.dirty.size());
+        assertEquals(1, changeLogger.removed.size());
 
         written.put(4, "four");
         changeLogger.add(4);
         changeLogger.maybeLogChange(getter);
-        assertEquals(0, changeLogger.numDirty());
-        assertEquals(0, changeLogger.numRemoved());
+        assertEquals(0, changeLogger.dirty.size());
+        assertEquals(0, changeLogger.removed.size());
         assertEquals(5, logged.size());
         assertEquals("zero-again", logged.get(0));
         assertEquals(null, logged.get(1));


[2/2] kafka git commit: KAFKA-4117: Stream partitionassignro cleanup

Posted by gu...@apache.org.
KAFKA-4117: Stream partitionassignro cleanup

1. Create a new `ClientMetadata` to collapse `Set<String> consumerMemberIds`, `ClientState<TaskId> state`, and `HostInfo hostInfo`.

2. Stop reusing `stateChangelogTopicToTaskIds` and `internalSourceTopicToTaskIds` to access the (sub-)topology's internal repartition and changelog topics for clarity; also use the source topics num.partitions to set the num.partitions for repartition topics, and clarify to NOT have cycles since otherwise the while loop will fail.

3. `ensure-copartition` at the end to modify the number of partitions for repartition topics if necessary to be equal to other co-partition topics.

4. Refactor `ClientState` as well and update the logic of `TaskAssignor` for clarity as well.

5. Change default `clientId` from `applicationId-suffix` to `applicationId-processId` where `processId` is an UUID to avoid conflicts of clientIds that are from different JVMs, and hence conflicts in metrics.

6. Enforce `assignment` partitions to have the same size, and hence 1-1 mapping to `activeTask` taskIds.

7. Remove the `AssignmentSupplier` class by always construct the `partitionsByHostState` before assigning tasks to consumers within a client.

8. Remove all unnecessary member variables in `StreamPartitionAssignor`.

9. Some other minor fixes on unit tests, e.g. remove `test only` functions with java field reflection.

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Xavier L�aut�, Matthias J. Sax, Eno Thereska, Jason Gustafson

Closes #2012 from guozhangwang/K4117-stream-partitionassignro-cleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a4ab9d02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a4ab9d02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a4ab9d02

Branch: refs/heads/trunk
Commit: a4ab9d02a22e77f0ebca0450e608898d83d4fe18
Parents: 0dd9607
Author: Guozhang Wang <wa...@gmail.com>
Authored: Sat Oct 29 10:48:17 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Oct 29 10:48:17 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/common/Cluster.java   |  20 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |   4 +-
 .../streams/processor/TopologyBuilder.java      |   8 +-
 .../internals/InternalTopicConfig.java          |   3 +-
 .../streams/processor/internals/SinkNode.java   |   5 -
 .../streams/processor/internals/SourceNode.java |   5 -
 .../internals/StreamPartitionAssignor.java      | 717 ++++++++++---------
 .../processor/internals/StreamThread.java       |  39 +-
 .../internals/assignment/AssignmentInfo.java    |  14 +-
 .../internals/assignment/ClientState.java       |  16 +-
 .../internals/assignment/TaskAssignor.java      |  37 +-
 .../apache/kafka/streams/state/HostInfo.java    |   2 +-
 .../state/internals/StoreChangeLogger.java      |  10 -
 .../integration/FanoutIntegrationTest.java      |   1 -
 .../kstream/internals/KStreamImplTest.java      |   2 +-
 .../kstream/internals/KTableImplTest.java       |  16 +-
 .../streams/processor/TopologyBuilderTest.java  |   4 +-
 .../internals/StreamPartitionAssignorTest.java  | 148 ++--
 .../internals/StreamsMetadataStateTest.java     |   2 +-
 .../assignment/AssignmentInfoTest.java          |   2 +-
 .../internals/assignment/TaskAssignorTest.java  | 115 +--
 .../state/internals/StoreChangeLoggerTest.java  |  21 +-
 22 files changed, 642 insertions(+), 549 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 471ae26..3c6475d 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -207,6 +207,16 @@ public final class Cluster {
     }
 
     /**
+     * Get the number of partitions for the given topic
+     * @param topic The topic to get the number of partitions for
+     * @return The number of partitions or null if there is no corresponding metadata
+     */
+    public Integer partitionCountForTopic(String topic) {
+        List<PartitionInfo> partitions = this.partitionsByTopic.get(topic);
+        return partitions == null ? null : partitions.size();
+    }
+
+    /**
      * Get the list of available partitions for this topic
      * @param topic The topic name
      * @return A list of partitions
@@ -225,16 +235,6 @@ public final class Cluster {
     }
 
     /**
-     * Get the number of partitions for the given topic
-     * @param topic The topic to get the number of partitions for
-     * @return The number of partitions or null if there is no corresponding metadata
-     */
-    public Integer partitionCountForTopic(String topic) {
-        List<PartitionInfo> partitionInfos = this.partitionsByTopic.get(topic);
-        return partitionInfos == null ? null : partitionInfos.size();
-    }
-
-    /**
      * Get all topics.
      * @return a set of all topics
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2333db7..e120b31 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -47,7 +47,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
@@ -91,7 +90,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class KafkaStreams {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
-    private static final AtomicInteger STREAM_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.streams";
 
     // container states
@@ -156,7 +154,7 @@ public class KafkaStreams {
 
         String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         if (clientId.length() <= 0)
-            clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
+            clientId = applicationId + "-" + processId;
 
         final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
             MetricsReporter.class);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 81f1f63..ecac8c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -227,14 +227,14 @@ public class TopologyBuilder {
     public static class TopicsInfo {
         public Set<String> sinkTopics;
         public Set<String> sourceTopics;
-        public Map<String, InternalTopicConfig> interSourceTopics;
         public Map<String, InternalTopicConfig> stateChangelogTopics;
+        public Map<String, InternalTopicConfig> repartitionSourceTopics;
 
-        public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> interSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
+        public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
             this.sinkTopics = sinkTopics;
             this.sourceTopics = sourceTopics;
-            this.interSourceTopics = interSourceTopics;
             this.stateChangelogTopics = stateChangelogTopics;
+            this.repartitionSourceTopics = repartitionSourceTopics;
         }
 
         @Override
@@ -258,7 +258,7 @@ public class TopologyBuilder {
             return "TopicsInfo{" +
                     "sinkTopics=" + sinkTopics +
                     ", sourceTopics=" + sourceTopics +
-                    ", interSourceTopics=" + interSourceTopics +
+                    ", repartitionSourceTopics=" + repartitionSourceTopics +
                     ", stateChangelogTopics=" + stateChangelogTopics +
                     '}';
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
index 45016c8..c23f134 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
@@ -30,9 +30,10 @@ public class InternalTopicConfig {
 
     private final String name;
     private final Map<String, String> logConfig;
-    private Long retentionMs;
     private final Set<CleanupPolicy> cleanupPolicies;
 
+    private Long retentionMs;
+
     public InternalTopicConfig(final String name, final Set<CleanupPolicy> defaultCleanupPolicies, final Map<String, String> logConfig) {
         Objects.requireNonNull(name, "name can't be null");
         if (defaultCleanupPolicies.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 2b5692d..4e56f61 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -77,11 +77,6 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
         // do nothing
     }
 
-    // for test only
-    public Serializer<V> valueSerializer() {
-        return valSerializer;
-    }
-
     /**
      * @return a string representation of this node, useful for debugging.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 4bc3a53..e17509b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -71,11 +71,6 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         // do nothing
     }
 
-    // for test only
-    public Deserializer<V> valueDeserializer() {
-        return valDeserializer;
-    }
-
     /**
      * @return a string representation of this node, useful for debugging.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index dcba543..009ba1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -46,37 +46,92 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import static org.apache.kafka.common.utils.Utils.getHost;
+import static org.apache.kafka.common.utils.Utils.getPort;
 import static org.apache.kafka.streams.processor.internals.InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
 
 public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
 
     private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
-    private String userEndPointConfig;
-    private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
-    private Cluster metadataWithInternalTopics;
-
 
     private static class AssignedPartition implements Comparable<AssignedPartition> {
         public final TaskId taskId;
         public final TopicPartition partition;
 
-        public AssignedPartition(TaskId taskId, TopicPartition partition) {
+        AssignedPartition(final TaskId taskId, final TopicPartition partition) {
             this.taskId = taskId;
             this.partition = partition;
         }
 
         @Override
-        public int compareTo(AssignedPartition that) {
+        public int compareTo(final AssignedPartition that) {
             return PARTITION_COMPARATOR.compare(this.partition, that.partition);
         }
     }
 
+    private static class ClientMetadata {
+        final HostInfo hostInfo;
+        final Set<String> consumers;
+        final ClientState<TaskId> state;
+
+        ClientMetadata(final String endPoint) {
+
+            // get the host info if possible
+            if (endPoint != null) {
+                final String host = getHost(endPoint);
+                final Integer port = getPort(endPoint);
+
+                if (host == null || port == null)
+                    throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
+
+                hostInfo = new HostInfo(host, port);
+            } else {
+                hostInfo = null;
+            }
+
+            // initialize the consumer memberIds
+            consumers = new HashSet<>();
+
+            // initialize the client state
+            state = new ClientState<>();
+        }
+
+        void addConsumer(final String consumerMemberId, final SubscriptionInfo info) {
+
+            consumers.add(consumerMemberId);
+
+            state.prevActiveTasks.addAll(info.prevTasks);
+            state.prevAssignedTasks.addAll(info.prevTasks);
+            state.prevAssignedTasks.addAll(info.standbyTasks);
+            state.capacity = state.capacity + 1d;
+        }
+
+        @Override
+        public String toString() {
+            return "ClientMetadata{" +
+                    "hostInfo=" + hostInfo +
+                    ", consumers=" + consumers +
+                    ", state=" + state +
+                    '}';
+        }
+    }
+
+    private static class InternalTopicMetadata {
+        public final InternalTopicConfig config;
+
+        public int numPartitions;
+
+        InternalTopicMetadata(final InternalTopicConfig config) {
+            this.config = config;
+            this.numPartitions = -1;
+        }
+    }
+
     private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
         @Override
         public int compare(TopicPartition p1, TopicPartition p2) {
@@ -92,12 +147,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
     private StreamThread streamThread;
 
+    private String userEndPoint;
     private int numStandbyReplicas;
-    private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
-    private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
-    private Map<InternalTopicConfig, Set<TaskId>> stateChangelogTopicToTaskIds;
-    private Map<InternalTopicConfig, Set<TaskId>> internalSourceTopicToTaskIds;
+
+    private Cluster metadataWithInternalTopics;
+    private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
+
     private Map<TaskId, Set<TopicPartition>> standbyTasks;
+    private Map<TaskId, Set<TopicPartition>> activeTasks;
 
     private InternalTopicManager internalTopicManager;
 
@@ -111,7 +168,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     public void configure(Map<String, ?> configs) {
         numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
-
         Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
         if (o == null) {
             KafkaException ex = new KafkaException("StreamThread is not specified");
@@ -130,21 +186,20 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
         if (userEndPoint != null && !userEndPoint.isEmpty()) {
-            final String[] hostPort = userEndPoint.split(":");
-            if (hostPort.length != 2) {
-                throw new ConfigException(String.format("stream-thread [%s] Config %s isn't in the correct format. Expected a host:port pair" +
-                                                       " but received %s",
-                        streamThread.getName(), StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
-            } else {
-                try {
-                    Integer.valueOf(hostPort[1]);
-                    this.userEndPointConfig = userEndPoint;
-                } catch (NumberFormatException nfe) {
-                    throw new ConfigException(String.format("stream-thread [%s] Invalid port %s supplied in %s for config %s",
-                            streamThread.getName(), hostPort[1], userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
-                }
+            try {
+                String host = getHost(userEndPoint);
+                Integer port = getPort(userEndPoint);
+
+                if (host == null || port == null)
+                    throw new ConfigException(String.format("stream-thread [%s] Config %s isn't in the correct format. Expected a host:port pair" +
+                                    " but received %s",
+                            streamThread.getName(), StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+            } catch (NumberFormatException nfe) {
+                throw new ConfigException(String.format("stream-thread [%s] Invalid port supplied in %s for config %s",
+                        streamThread.getName(), userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
             }
 
+            this.userEndPoint = userEndPoint;
         }
 
         if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
@@ -174,7 +229,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         Set<TaskId> prevTasks = streamThread.prevTasks();
         Set<TaskId> standbyTasks = streamThread.cachedTasks();
         standbyTasks.removeAll(prevTasks);
-        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPointConfig);
+        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPoint);
 
         if (streamThread.builder.sourceTopicPattern() != null) {
             SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
@@ -187,238 +242,252 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         return new Subscription(new ArrayList<>(topics), data.encode());
     }
 
-    /**
-     * Internal helper function that creates a Kafka topic
-     * @param topicToTaskIds Map that contains the topic names to be created
-     * @param postPartitionPhase If true, the computation for calculating the number of partitions
-     *                           is slightly different. Set to true after the initial topic-to-partition
-     *                           assignment.
-     * @return
+    /*
+     * This assigns tasks to consumer clients in the following steps.
+     *
+     * 0. check all repartition source topics and use internal topic manager to make sure
+     *    they have been created with the right number of partitions.
+     *
+     * 1. using user customized partition grouper to generate tasks along with their
+     *    assigned partitions; also make sure that the task's corresponding changelog topics
+     *    have been created with the right number of partitions.
+     *
+     * 2. using TaskAssignor to assign tasks to consumer clients.
+     *    - Assign a task to a client which was running it previously.
+     *      If there is no such client, assign a task to a client which has its valid local state.
+     *    - A client may have more than one stream threads.
+     *      The assignor tries to assign tasks to a client proportionally to the number of threads.
+     *    - We try not to assign the same set of tasks to two different clients
+     *    We do the assignment in one-pass. The result may not satisfy above all.
+     *
+     * 3. within each client, tasks are assigned to consumer clients in round-robin manner.
      */
-    private Map<TopicPartition, PartitionInfo> prepareTopic(Map<InternalTopicConfig, Set<TaskId>> topicToTaskIds,
-                                                            boolean postPartitionPhase) {
-        Map<TopicPartition, PartitionInfo> partitionInfos = new HashMap<>();
-        // if ZK is specified, prepare the internal source topic before calling partition grouper
-        if (internalTopicManager != null) {
-            log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", streamThread.getName());
-
-            for (Map.Entry<InternalTopicConfig, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
-                InternalTopicConfig topic = entry.getKey();
-                int numPartitions = 0;
-                if (postPartitionPhase) {
-                    // the expected number of partitions is the max value of TaskId.partition + 1
-                    for (TaskId task : entry.getValue()) {
-                        if (numPartitions < task.partition + 1)
-                            numPartitions = task.partition + 1;
-                    }
-                } else {
-                    // should have size 1 only
-                    numPartitions = -1;
-                    for (TaskId task : entry.getValue()) {
-                        numPartitions = task.partition;
-                    }
-                }
-
-                internalTopicManager.makeReady(topic, numPartitions);
-
-                // wait until the topic metadata has been propagated to all brokers
-                List<PartitionInfo> partitions;
-                do {
-                    partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
-                } while (partitions == null || partitions.size() != numPartitions);
-
-                for (PartitionInfo partition : partitions)
-                    partitionInfos.put(new TopicPartition(partition.topic(), partition.partition()), partition);
-            }
-
-            log.info("stream-thread [{}] Completed validating internal topics in partition assignor", streamThread.getName());
-        } else {
-            List<String> missingTopics = new ArrayList<>();
-            for (InternalTopicConfig topic : topicToTaskIds.keySet()) {
-                List<PartitionInfo> partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
-                if (partitions == null) {
-                    missingTopics.add(topic.name());
-                }
-            }
-            if (!missingTopics.isEmpty()) {
-                log.warn("stream-thread [{}] Topic {} do not exists but couldn't created as the config '{}' isn't supplied",
-                        streamThread.getName(), missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
-
-            }
-        }
-
-        return partitionInfos;
-    }
-
     @Override
     public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
-        // This assigns tasks to consumer clients in two steps.
-        // 1. using TaskAssignor to assign tasks to consumer clients.
-        //    - Assign a task to a client which was running it previously.
-        //      If there is no such client, assign a task to a client which has its valid local state.
-        //    - A client may have more than one stream threads.
-        //      The assignor tries to assign tasks to a client proportionally to the number of threads.
-        //    - We try not to assign the same set of tasks to two different clients
-        //    We do the assignment in one-pass. The result may not satisfy above all.
-        // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
-        Map<UUID, Set<String>> consumersByClient = new HashMap<>();
-        Map<UUID, ClientState<TaskId>> states = new HashMap<>();
-        Map<UUID, HostInfo> consumerEndPointMap = new HashMap<>();
-        // decode subscription info
+
+        // construct the client metadata from the decoded subscription info
+        Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
+
         for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
             String consumerId = entry.getKey();
             Subscription subscription = entry.getValue();
 
-
             SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
-            if (info.userEndPoint != null) {
-                final String[] hostPort = info.userEndPoint.split(":");
-                consumerEndPointMap.put(info.processId, new HostInfo(hostPort[0], Integer.valueOf(hostPort[1])));
-            }
-            Set<String> consumers = consumersByClient.get(info.processId);
-            if (consumers == null) {
-                consumers = new HashSet<>();
-                consumersByClient.put(info.processId, consumers);
-            }
-            consumers.add(consumerId);
 
-            ClientState<TaskId> state = states.get(info.processId);
-            if (state == null) {
-                state = new ClientState<>();
-                states.put(info.processId, state);
+            // create the new client metadata if necessary
+            ClientMetadata clientMetadata = clientsMetadata.get(info.processId);
+
+            if (clientMetadata == null) {
+                clientMetadata = new ClientMetadata(info.userEndPoint);
+                clientsMetadata.put(info.processId, clientMetadata);
             }
 
-            state.prevActiveTasks.addAll(info.prevTasks);
-            state.prevAssignedTasks.addAll(info.prevTasks);
-            state.prevAssignedTasks.addAll(info.standbyTasks);
-            state.capacity = state.capacity + 1d;
+            // add the consumer to the client
+            clientMetadata.addConsumer(consumerId, info);
         }
 
+        log.info("stream-thread [{}] Constructed client metadata {} from the member subscriptions.", streamThread.getName(), clientsMetadata);
 
-        this.topicGroups = streamThread.builder.topicGroups();
+        // ---------------- Step Zero ---------------- //
 
-        // ensure the co-partitioning topics within the group have the same number of partitions,
-        // and enforce the number of partitions for those internal topics.
-        internalSourceTopicToTaskIds = new HashMap<>();
-        Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
-        Map<Integer, Collection<InternalTopicConfig>> internalSourceTopicGroups = new HashMap<>();
-        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
-            sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
-            internalSourceTopicGroups.put(entry.getKey(), entry.getValue().interSourceTopics.values());
+        // parse the topology to determine the repartition source topics,
+        // making sure they are created with the number of partitions as
+        // the maximum of the depending sub-topologies source topics' number of partitions
+        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = streamThread.builder.topicGroups();
+
+        Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
+        for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+            for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
+                repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic));
+            }
         }
 
+        boolean numPartitionsNeeded;
+        do {
+            numPartitionsNeeded = false;
+
+            for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+                for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                    int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
+
+                    // try set the number of partitions for this repartition topic if it is not set yet
+                    if (numPartitions == -1) {
+                        for (TopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
+                            Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
+
+                            if (otherSinkTopics.contains(topicName)) {
+                                // if this topic is one of the sink topics of this topology,
+                                // use the maximum of all its source topic partitions as the number of partitions
+                                for (String sourceTopicName : otherTopicsInfo.sourceTopics) {
+                                    Integer numPartitionsCandidate;
+                                    // It is possible the sourceTopic is another internal topic, i.e,
+                                    // map().join().join(map())
+                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
+                                        numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numPartitions;
+                                    } else {
+                                        numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName);
+                                    }
 
-        // for all internal source topics
-        // set the number of partitions to the maximum of the depending sub-topologies source topics
-        Map<TopicPartition, PartitionInfo> internalPartitionInfos = new HashMap<>();
-        Map<String, InternalTopicConfig> allInternalTopics = new HashMap<>();
-        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
-            Map<String, InternalTopicConfig> internalTopics = entry.getValue().interSourceTopics;
-            allInternalTopics.putAll(internalTopics);
-            for (InternalTopicConfig internalTopic : internalTopics.values()) {
-                Set<TaskId> tasks = internalSourceTopicToTaskIds.get(internalTopic);
-
-                if (tasks == null) {
-                    int numPartitions = -1;
-                    for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> other : topicGroups.entrySet()) {
-                        Set<String> otherSinkTopics = other.getValue().sinkTopics;
-
-                        if (otherSinkTopics.contains(internalTopic.name())) {
-                            for (String topic : other.getValue().sourceTopics) {
-                                Integer partitions = null;
-                                // It is possible the sourceTopic is another internal topic, i.e,
-                                // map().join().join(map())
-                                if (allInternalTopics.containsKey(topic)) {
-                                    Set<TaskId> taskIds = internalSourceTopicToTaskIds.get(allInternalTopics.get(topic));
-                                    if (taskIds != null) {
-                                        for (TaskId taskId : taskIds) {
-                                            partitions = taskId.partition;
-                                        }
+                                    if (numPartitionsCandidate != null && numPartitionsCandidate > numPartitions) {
+                                        numPartitions = numPartitionsCandidate;
                                     }
-                                } else {
-                                    partitions = metadata.partitionCountForTopic(topic);
-                                }
-                                if (partitions != null && partitions > numPartitions) {
-                                    numPartitions = partitions;
                                 }
                             }
                         }
-                    }
-                    internalSourceTopicToTaskIds.put(internalTopic, Collections.singleton(new TaskId(entry.getKey(), numPartitions)));
-                    for (int partition = 0; partition < numPartitions; partition++) {
-                        internalPartitionInfos.put(new TopicPartition(internalTopic.name(), partition),
-                                                   new PartitionInfo(internalTopic.name(), partition, null, new Node[0], new Node[0]));
+
+                        // if we still have not find the right number of partitions,
+                        // another iteration is needed
+                        if (numPartitions == -1)
+                            numPartitionsNeeded = true;
+                        else
+                            repartitionTopicMetadata.get(topicName).numPartitions = numPartitions;
                     }
                 }
             }
+        } while (numPartitionsNeeded);
+
+        // augment the metadata with the newly computed number of partitions for all the
+        // repartition source topics
+        Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>();
+        for (Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
+            String topic = entry.getKey();
+            Integer numPartitions = entry.getValue().numPartitions;
+
+            for (int partition = 0; partition < numPartitions; partition++) {
+                allRepartitionTopicPartitions.put(new TopicPartition(topic, partition),
+                        new PartitionInfo(topic, partition, null, new Node[0], new Node[0]));
+            }
         }
 
+        // ensure the co-partitioning topics within the group have the same number of partitions,
+        // and enforce the number of partitions for those repartition topics to be the same if they
+        // are co-partitioned as well.
+        ensureCopartitioning(streamThread.builder.copartitionGroups(), repartitionTopicMetadata, metadata);
 
-        Collection<Set<String>> copartitionTopicGroups = streamThread.builder.copartitionGroups();
-        ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups,
-                             metadata.withPartitions(internalPartitionInfos));
-
-
-        internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false);
-        internalSourceTopicToTaskIds.clear();
+        // make sure the repartition source topics exist with the right number of partitions,
+        // create these topics if necessary
+        prepareTopic(repartitionTopicMetadata);
 
         metadataWithInternalTopics = metadata;
         if (internalTopicManager != null)
-            metadataWithInternalTopics = metadata.withPartitions(internalPartitionInfos);
+            metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
+
+        log.debug("stream-thread [{}] Created repartition topics {} from the parsed topology.", streamThread.getName(), allRepartitionTopicPartitions.values());
+
+        // ---------------- Step One ---------------- //
 
         // get the tasks as partition groups from the partition grouper
-        Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(
-                sourceTopicGroups, metadataWithInternalTopics);
+        Set<String> allSourceTopics = new HashSet<>();
+        Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
+        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+            allSourceTopics.addAll(entry.getValue().sourceTopics);
+            sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
+        }
 
-        // add tasks to state change log topic subscribers
-        stateChangelogTopicToTaskIds = new HashMap<>();
-        for (TaskId task : partitionsForTask.keySet()) {
-            final Map<String, InternalTopicConfig> stateChangelogTopics = topicGroups.get(task.topicGroupId).stateChangelogTopics;
-            for (InternalTopicConfig topic : stateChangelogTopics.values()) {
-                Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(topic);
-                if (tasks == null) {
-                    tasks = new HashSet<>();
-                    stateChangelogTopicToTaskIds.put(topic, tasks);
+        Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(
+                sourceTopicsByGroup, metadataWithInternalTopics);
+
+        // check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks
+        Set<TopicPartition> allAssignedPartitions = new HashSet<>();
+        Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
+            Set<TopicPartition> partitions = entry.getValue();
+            for (TopicPartition partition : partitions) {
+                if (allAssignedPartitions.contains(partition)) {
+                    log.warn("stream-thread [{}] Partition {} is assigned to more than one tasks: {}", streamThread.getName(), partition, partitionsForTask);
                 }
+            }
+            allAssignedPartitions.addAll(partitions);
 
-                tasks.add(task);
+            TaskId id = entry.getKey();
+            Set<TaskId> ids = tasksByTopicGroup.get(id.topicGroupId);
+            if (ids == null) {
+                ids = new HashSet<>();
+                tasksByTopicGroup.put(id.topicGroupId, ids);
+            }
+            ids.add(id);
+        }
+        for (String topic : allSourceTopics) {
+            for (PartitionInfo partitionInfo : metadataWithInternalTopics.partitionsForTopic(topic)) {
+                TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
+                if (!allAssignedPartitions.contains(partition)) {
+                    log.warn("stream-thread [{}] Partition {} is not assigned to any tasks: {}", streamThread.getName(), partition, partitionsForTask);
+                }
             }
+        }
 
-            final Map<String, InternalTopicConfig> interSourceTopics = topicGroups.get(task.topicGroupId).interSourceTopics;
-            for (InternalTopicConfig topic : interSourceTopics.values()) {
-                Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topic);
-                if (tasks == null) {
-                    tasks = new HashSet<>();
-                    internalSourceTopicToTaskIds.put(topic, tasks);
+        // add tasks to state change log topic subscribers
+        Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
+        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+            final int topicGroupId = entry.getKey();
+            final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
+
+            for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
+                // the expected number of partitions is the max value of TaskId.partition + 1
+                int numPartitions = -1;
+                for (TaskId task : tasksByTopicGroup.get(topicGroupId)) {
+                    if (numPartitions < task.partition + 1)
+                        numPartitions = task.partition + 1;
                 }
 
-                tasks.add(task);
+                InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig);
+                topicMetadata.numPartitions = numPartitions;
+
+                changelogTopicMetadata.put(topicConfig.name(), topicMetadata);
             }
         }
 
+        prepareTopic(changelogTopicMetadata);
+
+        log.debug("stream-thread [{}] Created state changelog topics {} from the parsed topology.", streamThread.getName(), changelogTopicMetadata);
+
+        // ---------------- Step Two ---------------- //
+
         // assign tasks to clients
-        states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas, streamThread.getName());
+        Map<UUID, ClientState<TaskId>> states = new HashMap<>();
+        for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+            states.put(entry.getKey(), entry.getValue().state);
+        }
+
+        log.debug("stream-thread [{}] Assigning tasks {} to clients {} with number of replicas {}",
+                streamThread.getName(), partitionsForTask.keySet(), states, numStandbyReplicas);
+
+        TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
+
+        log.info("stream-thread [{}] Assigned tasks to clients as {}.", streamThread.getName(), states);
 
-        final List<AssignmentSupplier> assignmentSuppliers = new ArrayList<>();
+        // ---------------- Step Three ---------------- //
+
+        // construct the global partition assignment per host map
+        partitionsByHostState = new HashMap<>();
+        for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+            HostInfo hostInfo = entry.getValue().hostInfo;
+
+            if (hostInfo != null) {
+                final Set<TopicPartition> topicPartitions = new HashSet<>();
+                final ClientState<TaskId> state = entry.getValue().state;
+
+                for (TaskId id : state.assignedTasks) {
+                    topicPartitions.addAll(partitionsForTask.get(id));
+                }
+
+                partitionsByHostState.put(hostInfo, topicPartitions);
+            }
+        }
 
-        final Map<HostInfo, Set<TopicPartition>> endPointMap = new HashMap<>();
-        for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
-            UUID processId = entry.getKey();
-            Set<String> consumers = entry.getValue();
-            ClientState<TaskId> state = states.get(processId);
+        // within the client, distribute tasks to its owned consumers
+        Map<String, Assignment> assignment = new HashMap<>();
+        for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+            Set<String> consumers = entry.getValue().consumers;
+            ClientState<TaskId> state = entry.getValue().state;
 
             ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
             final int numActiveTasks = state.activeTasks.size();
-            for (TaskId taskId : state.activeTasks) {
-                taskIds.add(taskId);
-            }
-            for (TaskId id : state.assignedTasks) {
-                if (!state.activeTasks.contains(id))
-                    taskIds.add(id);
-            }
 
-            final int numConsumers = consumers.size();
+            taskIds.addAll(state.activeTasks);
+            taskIds.addAll(state.standbyTasks);
 
+            final int numConsumers = consumers.size();
 
             int i = 0;
             for (String consumer : consumers) {
@@ -448,100 +517,58 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 for (AssignedPartition partition : assignedPartitions) {
                     active.add(partition.taskId);
                     activePartitions.add(partition.partition);
-                    HostInfo hostInfo = consumerEndPointMap.get(processId);
-                    if (hostInfo != null) {
-                        if (!endPointMap.containsKey(hostInfo)) {
-                            endPointMap.put(hostInfo, new HashSet<TopicPartition>());
-                        }
-                        final Set<TopicPartition> topicPartitions = endPointMap.get(hostInfo);
-                        topicPartitions.add(partition.partition);
-                    }
                 }
 
-
-                assignmentSuppliers.add(new AssignmentSupplier(consumer,
-                                                               active,
-                                                               standby,
-                                                               endPointMap,
-                                                               activePartitions));
+                // finally, encode the assignment before sending back to coordinator
+                assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
 
                 i++;
             }
         }
 
-        // if ZK is specified, validate the internal topics again
-        prepareTopic(internalSourceTopicToTaskIds,  /* compactTopic */ true);
-        // change log topics should be compacted
-        prepareTopic(stateChangelogTopicToTaskIds,  /* compactTopic */ true);
-
-        Map<String, Assignment> assignment = new HashMap<>();
-        for (AssignmentSupplier assignmentSupplier : assignmentSuppliers) {
-            assignment.put(assignmentSupplier.consumer, assignmentSupplier.get());
-        }
         return assignment;
     }
 
-    class AssignmentSupplier {
-        private final String consumer;
-        private final List<TaskId> active;
-        private final Map<TaskId, Set<TopicPartition>> standby;
-        private final Map<HostInfo, Set<TopicPartition>> endPointMap;
-        private final List<TopicPartition> activePartitions;
-
-        AssignmentSupplier(final String consumer,
-                           final List<TaskId> active,
-                           final Map<TaskId, Set<TopicPartition>> standby,
-                           final Map<HostInfo, Set<TopicPartition>> endPointMap,
-                           final List<TopicPartition> activePartitions) {
-            this.consumer = consumer;
-            this.active = active;
-            this.standby = standby;
-            this.endPointMap = endPointMap;
-            this.activePartitions = activePartitions;
-        }
-
-        Assignment get() {
-            return new Assignment(activePartitions, new AssignmentInfo(active,
-                                                                       standby,
-                                                                       endPointMap).encode());
-        }
-    }
-
     /**
      * @throws TaskAssignmentException if there is no task id for one of the partitions specified
      */
     @Override
     public void onAssignment(Assignment assignment) {
         List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
-
         Collections.sort(partitions, PARTITION_COMPARATOR);
 
         AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
         this.standbyTasks = info.standbyTasks;
+        this.activeTasks = new HashMap<>();
+
+        // the number of assigned partitions should be the same as number of active tasks, which
+        // could be duplicated if one task has more than one assigned partitions
+        if (partitions.size() != info.activeTasks.size()) {
+            throw new TaskAssignmentException(
+                    String.format("stream-thread [%s] Number of assigned partitions %d is not equal to the number of active taskIds %d" +
+                            ", assignmentInfo=%s", streamThread.getName(), partitions.size(), info.activeTasks.size(), info.toString())
+            );
+        }
 
-        Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
-        Iterator<TaskId> iter = info.activeTasks.iterator();
-        for (TopicPartition partition : partitions) {
-            Set<TaskId> taskIds = partitionToTaskIds.get(partition);
-            if (taskIds == null) {
-                taskIds = new HashSet<>();
-                partitionToTaskIds.put(partition, taskIds);
-            }
+        for (int i = 0; i < partitions.size(); i++) {
+            TopicPartition partition = partitions.get(i);
+            TaskId id = info.activeTasks.get(i);
 
-            if (iter.hasNext()) {
-                taskIds.add(iter.next());
-            } else {
-                TaskAssignmentException ex = new TaskAssignmentException(
-                        String.format("stream-thread [%s] failed to find a task id for the partition=%s" +
-                        ", partitions=%d, assignmentInfo=%s", streamThread.getName(), partition.toString(), partitions.size(), info.toString())
-                );
-                log.error(ex.getMessage(), ex);
-                throw ex;
+            Set<TopicPartition> assignedPartitions = activeTasks.get(id);
+            if (assignedPartitions == null) {
+                assignedPartitions = new HashSet<>();
+                activeTasks.put(id, assignedPartitions);
             }
+            assignedPartitions.add(partition);
+        }
+
+        // only need to update the host partitions map if it is not leader
+        if (this.partitionsByHostState == null) {
+            this.partitionsByHostState = info.partitionsByHost;
         }
-        this.partitionToTaskIds = partitionToTaskIds;
-        this.partitionsByHostState = info.partitionsByHostState;
-        // only need to build when not coordinator
+
+        // only need to build if it is not leader
         if (metadataWithInternalTopics == null) {
             final Collection<Set<TopicPartition>> values = partitionsByHostState.values();
             final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
@@ -558,92 +585,129 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         }
     }
 
-    public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
-        if (partitionsByHostState == null) {
-            return Collections.emptyMap();
-        }
-        return Collections.unmodifiableMap(partitionsByHostState);
-    }
+    /**
+     * Internal helper function that creates a Kafka topic
+     *
+     * @param topicPartitions Map that contains the topic names to be created with the number of partitions
+     */
+    private void prepareTopic(Map<String, InternalTopicMetadata> topicPartitions) {
+        log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", streamThread.getName());
 
-    public Cluster clusterMetadata() {
-        if (metadataWithInternalTopics == null) {
-            return Cluster.empty();
-        }
-        return metadataWithInternalTopics;
-    }
+        // if ZK is specified, prepare the internal source topic before calling partition grouper
+        if (internalTopicManager != null) {
+            for (Map.Entry<String, InternalTopicMetadata> entry : topicPartitions.entrySet()) {
+                InternalTopicConfig topic = entry.getValue().config;
+                Integer numPartitions = entry.getValue().numPartitions;
 
-    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<Integer, Collection<InternalTopicConfig>> internalTopicGroups, Cluster metadata) {
-        Map<String, InternalTopicConfig> internalTopics = new HashMap<>();
-        for (Collection<InternalTopicConfig> topics : internalTopicGroups.values()) {
-            for (InternalTopicConfig topic : topics) {
-                internalTopics.put(topic.name(), topic);
+                if (numPartitions < 0)
+                    throw new TopologyBuilderException(String.format("stream-thread [%s] Topic [%s] number of partitions not defined", streamThread.getName(), topic.name()));
+
+                internalTopicManager.makeReady(topic, numPartitions);
+
+                // wait until the topic metadata has been propagated to all brokers
+                List<PartitionInfo> partitions;
+                do {
+                    partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
+                } while (partitions == null || partitions.size() != numPartitions);
+            }
+        } else {
+            List<String> missingTopics = new ArrayList<>();
+            for (String topic : topicPartitions.keySet()) {
+                List<PartitionInfo> partitions = streamThread.restoreConsumer.partitionsFor(topic);
+                if (partitions == null) {
+                    missingTopics.add(topic);
+                }
+            }
+
+            if (!missingTopics.isEmpty()) {
+                log.warn("stream-thread [{}] Topic {} do not exists but couldn't created as the config '{}' isn't supplied",
+                        streamThread.getName(), missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
             }
         }
 
+        log.info("stream-thread [{}] Completed validating internal topics in partition assignor", streamThread.getName());
+    }
+
+    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups,
+                                      Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
+                                      Cluster metadata) {
         for (Set<String> copartitionGroup : copartitionGroups) {
-            ensureCopartitioning(copartitionGroup, internalTopics, metadata);
+            ensureCopartitioning(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
         }
     }
 
-    private void ensureCopartitioning(Set<String> copartitionGroup, Map<String, InternalTopicConfig> internalTopics, Cluster metadata) {
+    private void ensureCopartitioning(Set<String> copartitionGroup,
+                                      Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
+                                      Cluster metadata) {
         int numPartitions = -1;
 
         for (String topic : copartitionGroup) {
-            if (!internalTopics.containsKey(topic)) {
-                List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
+            if (!allRepartitionTopicsNumPartitions.containsKey(topic)) {
+                Integer partitions = metadata.partitionCountForTopic(topic);
 
-                if (infos == null)
-                    throw new TopologyBuilderException(String.format("stream-thread [%s] External source topic not found: %s", streamThread.getName(), topic));
+                if (partitions == null)
+                    throw new TopologyBuilderException(String.format("stream-thread [%s] Topic not found: %s", streamThread.getName(), topic));
 
                 if (numPartitions == -1) {
-                    numPartitions = infos.size();
-                } else if (numPartitions != infos.size()) {
+                    numPartitions = partitions;
+                } else if (numPartitions != partitions) {
                     String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
                     Arrays.sort(topics);
-                    throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not copartitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics), ",")));
+                    throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics), ",")));
                 }
             }
         }
 
+        // if all topics for this co-partition group is repartition topics,
+        // then set the number of partitions to be the maximum of the number of partitions.
         if (numPartitions == -1) {
-            for (InternalTopicConfig topic : internalTopics.values()) {
-                if (copartitionGroup.contains(topic.name())) {
-                    Integer partitions = metadata.partitionCountForTopic(topic.name());
-                    if (partitions != null && partitions > numPartitions) {
+            for (Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) {
+                if (copartitionGroup.contains(entry.getKey())) {
+                    int partitions = entry.getValue().numPartitions;
+                    if (partitions > numPartitions) {
                         numPartitions = partitions;
                     }
                 }
             }
         }
-        // enforce co-partitioning restrictions to internal topics reusing internalSourceTopicToTaskIds
-        for (InternalTopicConfig topic : internalTopics.values()) {
-            if (copartitionGroup.contains(topic.name())) {
-                internalSourceTopicToTaskIds
-                    .put(topic, Collections.singleton(new TaskId(-1, numPartitions)));
+
+        // enforce co-partitioning restrictions to repartition topics by updating their number of partitions
+        for (Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) {
+            if (copartitionGroup.contains(entry.getKey())) {
+                entry.getValue().numPartitions = numPartitions;
             }
         }
     }
 
-    /* For Test Only */
-    public Set<TaskId> tasksForState(String stateName) {
-        final String changeLogName = ProcessorStateManager.storeChangelogTopic(streamThread.applicationId, stateName);
-        for (InternalTopicConfig internalTopicConfig : stateChangelogTopicToTaskIds.keySet()) {
-            if (internalTopicConfig.name().equals(changeLogName)) {
-                return stateChangelogTopicToTaskIds.get(internalTopicConfig);
-            }
+    Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
+        if (partitionsByHostState == null) {
+            return Collections.emptyMap();
         }
-        return Collections.emptySet();
+        return Collections.unmodifiableMap(partitionsByHostState);
+    }
+
+    Cluster clusterMetadata() {
+        if (metadataWithInternalTopics == null) {
+            return Cluster.empty();
+        }
+        return metadataWithInternalTopics;
     }
 
-    public Set<TaskId> tasksForPartition(TopicPartition partition) {
-        return partitionToTaskIds.get(partition);
+    Map<TaskId, Set<TopicPartition>> activeTasks() {
+        if (activeTasks == null) {
+            return Collections.emptyMap();
+        }
+        return Collections.unmodifiableMap(activeTasks);
     }
 
-    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
-        return standbyTasks;
+    Map<TaskId, Set<TopicPartition>> standbyTasks() {
+        if (standbyTasks == null) {
+            return Collections.emptyMap();
+        }
+        return Collections.unmodifiableMap(standbyTasks);
     }
 
-    public void setInternalTopicManager(InternalTopicManager internalTopicManager) {
+    void setInternalTopicManager(InternalTopicManager internalTopicManager) {
         this.internalTopicManager = internalTopicManager;
     }
 
@@ -651,11 +715,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
      * Used to capture subscribed topic via Patterns discovered during the
      * partition assignment process.
      */
-    public static  class SubscriptionUpdates {
+    public static class SubscriptionUpdates {
 
         private final Set<String> updatedTopicSubscriptions = new HashSet<>();
 
-
         private  void updateTopics(Collection<String> topicNames) {
             updatedTopicSubscriptions.clear();
             updatedTopicSubscriptions.addAll(topicNames);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d7bb98c..7a04339 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -111,7 +111,6 @@ public class StreamThread extends Thread {
     private boolean processStandbyRecords = false;
     private AtomicBoolean initialized = new AtomicBoolean(false);
 
-    private final long cacheSizeBytes;
     private ThreadCache cache;
 
     final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@@ -184,7 +183,7 @@ public class StreamThread extends Thread {
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
             log.warn("Negative cache size passed in thread [{}]. Reverting to cache size of 0 bytes.", threadName);
         }
-        this.cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
+        long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
             config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG));
         this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.sensors);
 
@@ -637,34 +636,24 @@ public class StreamThread extends Thread {
         if (partitionAssignor == null)
             throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen.");
 
-        HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
-
-        for (TopicPartition partition : assignment) {
-            Set<TaskId> taskIds = partitionAssignor.tasksForPartition(partition);
-            for (TaskId taskId : taskIds) {
-                Set<TopicPartition> partitions = partitionsForTask.get(taskId);
-                if (partitions == null) {
-                    partitions = new HashSet<>();
-                    partitionsForTask.put(taskId, partitions);
-                }
-                partitions.add(partition);
-            }
-        }
-
         // create the active tasks
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.activeTasks().entrySet()) {
             TaskId taskId = entry.getKey();
             Set<TopicPartition> partitions = entry.getValue();
 
-            try {
-                StreamTask task = createStreamTask(taskId, partitions);
-                activeTasks.put(taskId, task);
+            if (assignment.containsAll(partitions)) {
+                try {
+                    StreamTask task = createStreamTask(taskId, partitions);
+                    activeTasks.put(taskId, task);
 
-                for (TopicPartition partition : partitions)
-                    activeTasksByPartition.put(partition, task);
-            } catch (StreamsException e) {
-                log.error("{} Failed to create an active task %s: ", logPrefix, taskId, e);
-                throw e;
+                    for (TopicPartition partition : partitions)
+                        activeTasksByPartition.put(partition, task);
+                } catch (StreamsException e) {
+                    log.error("{} Failed to create an active task %s: ", logPrefix, taskId, e);
+                    throw e;
+                }
+            } else {
+                log.warn("{} Task {} owned partitions {} are not contained in the assignment {}", logPrefix, taskId, partitions, assignment);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 6569f85..ddbd67d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -41,7 +41,7 @@ public class AssignmentInfo {
 
     private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
     /**
-     * A new field was added, partitionsByHostState. CURRENT_VERSION
+     * A new field was added, partitionsByHost. CURRENT_VERSION
      * is required so we can decode the previous version. For example, this may occur
      * during a rolling upgrade
      */
@@ -49,7 +49,7 @@ public class AssignmentInfo {
     public final int version;
     public final List<TaskId> activeTasks; // each element corresponds to a partition
     public final Map<TaskId, Set<TopicPartition>> standbyTasks;
-    public final Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
+    public final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
 
     public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
                           Map<HostInfo, Set<TopicPartition>> hostState) {
@@ -61,7 +61,7 @@ public class AssignmentInfo {
         this.version = version;
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
-        this.partitionsByHostState = hostState;
+        this.partitionsByHost = hostState;
     }
 
     /**
@@ -89,8 +89,8 @@ public class AssignmentInfo {
                 Set<TopicPartition> partitions = entry.getValue();
                 writeTopicPartitions(out, partitions);
             }
-            out.writeInt(partitionsByHostState.size());
-            for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHostState
+            out.writeInt(partitionsByHost.size());
+            for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost
                     .entrySet()) {
                 final HostInfo hostInfo = entry.getKey();
                 out.writeUTF(hostInfo.host());
@@ -174,7 +174,7 @@ public class AssignmentInfo {
 
     @Override
     public int hashCode() {
-        return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHostState.hashCode();
+        return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
     }
 
     @Override
@@ -184,7 +184,7 @@ public class AssignmentInfo {
             return this.version == other.version &&
                     this.activeTasks.equals(other.activeTasks) &&
                     this.standbyTasks.equals(other.standbyTasks) &&
-                    this.partitionsByHostState.equals(other.partitionsByHostState);
+                    this.partitionsByHost.equals(other.partitionsByHost);
         } else {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index b59af86..0746cab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -22,11 +22,12 @@ import java.util.Set;
 
 public class ClientState<T> {
 
-    public final static double COST_ACTIVE = 0.1;
-    public final static double COST_STANDBY  = 0.2;
-    public final static double COST_LOAD = 0.5;
+    final static double COST_ACTIVE = 0.1;
+    final static double COST_STANDBY  = 0.2;
+    final static double COST_LOAD = 0.5;
 
     public final Set<T> activeTasks;
+    public final Set<T> standbyTasks;
     public final Set<T> assignedTasks;
     public final Set<T> prevActiveTasks;
     public final Set<T> prevAssignedTasks;
@@ -39,11 +40,12 @@ public class ClientState<T> {
     }
 
     public ClientState(double capacity) {
-        this(new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), capacity);
+        this(new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), capacity);
     }
 
-    private ClientState(Set<T> activeTasks, Set<T> assignedTasks, Set<T> prevActiveTasks, Set<T> prevAssignedTasks, double capacity) {
+    private ClientState(Set<T> activeTasks, Set<T> standbyTasks, Set<T> assignedTasks, Set<T> prevActiveTasks, Set<T> prevAssignedTasks, double capacity) {
         this.activeTasks = activeTasks;
+        this.standbyTasks = standbyTasks;
         this.assignedTasks = assignedTasks;
         this.prevActiveTasks = prevActiveTasks;
         this.prevAssignedTasks = prevAssignedTasks;
@@ -52,13 +54,15 @@ public class ClientState<T> {
     }
 
     public ClientState<T> copy() {
-        return new ClientState<>(new HashSet<>(activeTasks), new HashSet<>(assignedTasks),
+        return new ClientState<>(new HashSet<>(activeTasks), new HashSet<>(standbyTasks), new HashSet<>(assignedTasks),
                 new HashSet<>(prevActiveTasks), new HashSet<>(prevAssignedTasks), capacity);
     }
 
     public void assign(T taskId, boolean active) {
         if (active)
             activeTasks.add(taskId);
+        else
+            standbyTasks.add(taskId);
 
         assignedTasks.add(taskId);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
index fadb43f..e807c4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Random;
@@ -33,24 +32,20 @@ public class TaskAssignor<C, T extends Comparable<T>> {
 
     private static final Logger log = LoggerFactory.getLogger(TaskAssignor.class);
 
-    public static <C, T extends Comparable<T>> Map<C, ClientState<T>> assign(Map<C, ClientState<T>> states, Set<T> tasks, int numStandbyReplicas, String streamThreadId) {
+    public static <C, T extends Comparable<T>> void assign(Map<C, ClientState<T>> states, Set<T> tasks, int numStandbyReplicas) {
         long seed = 0L;
         for (C client : states.keySet()) {
             seed += client.hashCode();
         }
 
         TaskAssignor<C, T> assignor = new TaskAssignor<>(states, tasks, seed);
-        log.info("stream-thread [{}] Assigning tasks to clients: {}, prevAssignmentBalanced: {}, " +
-            "prevClientsUnchanged: {}, tasks: {}, replicas: {}",
-            streamThreadId, states, assignor.prevAssignmentBalanced, assignor.prevClientsUnchanged,
-            tasks, numStandbyReplicas);
 
+        // assign active tasks
         assignor.assignTasks();
+
+        // assign standby tasks
         if (numStandbyReplicas > 0)
             assignor.assignStandbyTasks(numStandbyReplicas);
-
-        log.info("stream-thread [{}] Assigned with: {}", streamThreadId, assignor.states);
-        return assignor.states;
     }
 
     private final Random rand;
@@ -63,36 +58,38 @@ public class TaskAssignor<C, T extends Comparable<T>> {
 
     private TaskAssignor(Map<C, ClientState<T>> states, Set<T> tasks, long randomSeed) {
         this.rand = new Random(randomSeed);
-        this.states = new HashMap<>();
+        this.tasks = new ArrayList<>(tasks);
+        this.states = states;
+
         int avgNumTasks = tasks.size() / states.size();
         Set<T> existingTasks = new HashSet<>();
         for (Map.Entry<C, ClientState<T>> entry : states.entrySet()) {
-            this.states.put(entry.getKey(), entry.getValue().copy());
             Set<T> oldTasks = entry.getValue().prevAssignedTasks;
+
             // make sure the previous assignment is balanced
             prevAssignmentBalanced = prevAssignmentBalanced &&
                 oldTasks.size() < 2 * avgNumTasks && oldTasks.size() > avgNumTasks / 2;
+
+            // make sure there are no duplicates
             for (T task : oldTasks) {
-                // Make sure there is no duplicates
                 prevClientsUnchanged = prevClientsUnchanged && !existingTasks.contains(task);
             }
             existingTasks.addAll(oldTasks);
         }
-        // Make sure the existing assignment didn't miss out any task
-        prevClientsUnchanged = prevClientsUnchanged && existingTasks.equals(tasks);
 
-        this.tasks = new ArrayList<>(tasks);
+        // make sure the existing assignment didn't miss out any task
+        prevClientsUnchanged = prevClientsUnchanged && existingTasks.equals(tasks);
 
         int numTasks = tasks.size();
         this.maxNumTaskPairs = numTasks * (numTasks - 1) / 2;
         this.taskPairs = new HashSet<>(this.maxNumTaskPairs);
     }
 
-    public void assignTasks() {
+    private void assignTasks() {
         assignTasks(true);
     }
 
-    public void assignStandbyTasks(int numStandbyReplicas) {
+    private void assignStandbyTasks(int numStandbyReplicas) {
         int numReplicas = Math.min(numStandbyReplicas, states.size() - 1);
         for (int i = 0; i < numReplicas; i++) {
             assignTasks(false);
@@ -195,10 +192,10 @@ public class TaskAssignor<C, T extends Comparable<T>> {
     }
 
     private static class TaskPair<T> {
-        public final T task1;
-        public final T task2;
+        final T task1;
+        final T task2;
 
-        public TaskPair(T task1, T task2) {
+        TaskPair(T task1, T task2) {
             this.task1 = task1;
             this.task2 = task2;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
index 37a15e1..ce75d91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
@@ -74,7 +74,7 @@ public class HostInfo {
     @Override
     public String toString() {
         return "HostInfo{" +
-                "host='" + host + '\'' +
+                "host=\'" + host + '\'' +
                 ", port=" + port +
                 '}';
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 41f9ae2..83e3d10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -120,14 +120,4 @@ public class StoreChangeLogger<K, V> {
         this.removed.clear();
         this.dirty.clear();
     }
-
-    // this is for test only
-    public int numDirty() {
-        return this.dirty.size();
-    }
-
-    // this is for test only
-    public int numRemoved() {
-        return this.removed.size();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index a5fb076..03df4cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -85,7 +85,6 @@ public class FanoutIntegrationTest {
         CLUSTER.createTopic(OUTPUT_TOPIC_C);
     }
 
-
     @Parameter
     public long cacheSizeBytes;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index e5e334c..8c96ecb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -206,7 +206,7 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldCantHaveNullPredicate() throws Exception {
-        testStream.branch(null);
+        testStream.branch((Predicate) null);
     }
 
     @Test(expected = NullPointerException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index afa1033..49dcbd0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -40,6 +40,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.lang.reflect.Field;
 import java.io.File;
 import java.io.IOException;
 
@@ -337,7 +338,7 @@ public class KTableImplTest {
     }
 
     @Test
-    public void testRepartition() throws IOException {
+    public void testRepartition() throws Exception {
         String topic1 = "topic1";
         String storeName1 = "storeName1";
 
@@ -367,10 +368,15 @@ public class KTableImplTest {
         assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
         assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
 
-        assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).valueSerializer()).inner());
-        assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).valueDeserializer()).inner());
-        assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner());
-        assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner());
+        Field valSerializerField  = ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
+        Field valDeserializerField  = ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
+        valSerializerField.setAccessible(true);
+        valDeserializerField.setAccessible(true);
+
+        assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000003"))).inner());
+        assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000004"))).inner());
+        assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000007"))).inner());
+        assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000008"))).inner());
     }
 
     @Test(expected = NullPointerException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index d260937..c402c9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -156,7 +156,7 @@ public class TopologyBuilderTest {
         builder.addSource("source-3", "topic-3");
         builder.addInternalTopic("topic-3");
 
-        Set<String> expected = new HashSet<String>();
+        Set<String> expected = new HashSet<>();
         expected.add("topic-1");
         expected.add("topic-2");
         expected.add("X-topic-3");
@@ -516,7 +516,7 @@ public class TopologyBuilderTest {
         builder.addInternalTopic("foo");
         builder.addSource("source", "foo");
         final TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
-        final InternalTopicConfig topicConfig = topicsInfo.interSourceTopics.get("appId-foo");
+        final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
         final Properties properties = topicConfig.toProperties(0);
         assertEquals("appId-foo", topicConfig.name());
         assertEquals("delete", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));