You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/28 17:37:33 UTC

[2/3] kafka git commit: KAFKA-6170; KIP-220 Part 2: Break dependency of Assignor on StreamThread

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 0238615..1eecb73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -17,17 +17,24 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.HostInfo;
 import org.slf4j.Logger;
 
+import java.io.File;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Collections.singleton;
@@ -37,6 +44,7 @@ class TaskManager {
     // activeTasks needs to be concurrent as it can be accessed
     // by QueryableState
     private final Logger log;
+    private final UUID processId;
     private final AssignedStreamsTasks active;
     private final AssignedStandbyTasks standby;
     private final ChangelogReader changelogReader;
@@ -44,18 +52,33 @@ class TaskManager {
     private final Consumer<byte[], byte[]> restoreConsumer;
     private final StreamThread.AbstractTaskCreator<StreamTask> taskCreator;
     private final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
-    private ThreadMetadataProvider threadMetadataProvider;
+    private final StreamsMetadataState streamsMetadataState;
+
+    // TODO: this is going to be replaced by AdminClient
+    final StreamsKafkaClient streamsKafkaClient;
+
+    // following information is updated during rebalance phase by the partition assignor
+    private Cluster cluster;
+    private Map<TaskId, Set<TopicPartition>> assignedActiveTasks;
+    private Map<TaskId, Set<TopicPartition>> assignedStandbyTasks;
+    private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
+
     private Consumer<byte[], byte[]> consumer;
 
     TaskManager(final ChangelogReader changelogReader,
+                final UUID processId,
                 final String logPrefix,
                 final Consumer<byte[], byte[]> restoreConsumer,
+                final StreamsMetadataState streamsMetadataState,
                 final StreamThread.AbstractTaskCreator<StreamTask> taskCreator,
                 final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator,
+                final StreamsKafkaClient streamsKafkaClient,
                 final AssignedStreamsTasks active,
                 final AssignedStandbyTasks standby) {
         this.changelogReader = changelogReader;
+        this.processId = processId;
         this.logPrefix = logPrefix;
+        this.streamsMetadataState = streamsMetadataState;
         this.restoreConsumer = restoreConsumer;
         this.taskCreator = taskCreator;
         this.standbyTaskCreator = standbyTaskCreator;
@@ -65,15 +88,14 @@ class TaskManager {
         final LogContext logContext = new LogContext(logPrefix);
 
         this.log = logContext.logger(getClass());
+
+        this.streamsKafkaClient = streamsKafkaClient;
     }
 
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     void createTasks(final Collection<TopicPartition> assignment) {
-        if (threadMetadataProvider == null) {
-            throw new IllegalStateException(logPrefix + "taskIdProvider has not been initialized while adding stream tasks. This should not happen.");
-        }
         if (consumer == null) {
             throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen.");
         }
@@ -81,8 +103,7 @@ class TaskManager {
         changelogReader.reset();
         // do this first as we may have suspended standby tasks that
         // will become active or vice versa
-        standby.closeNonAssignedSuspendedTasks(threadMetadataProvider.standbyTasks());
-        Map<TaskId, Set<TopicPartition>> assignedActiveTasks = threadMetadataProvider.activeTasks();
+        standby.closeNonAssignedSuspendedTasks(assignedStandbyTasks);
         active.closeNonAssignedSuspendedTasks(assignedActiveTasks);
         addStreamTasks(assignment);
         addStandbyTasks();
@@ -91,22 +112,17 @@ class TaskManager {
         consumer.pause(partitions);
     }
 
-    void setThreadMetadataProvider(final ThreadMetadataProvider threadMetadataProvider) {
-        this.threadMetadataProvider = threadMetadataProvider;
-    }
-
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     private void addStreamTasks(final Collection<TopicPartition> assignment) {
-        Map<TaskId, Set<TopicPartition>> assignedTasks = threadMetadataProvider.activeTasks();
-        if (assignedTasks.isEmpty()) {
+        if (assignedActiveTasks.isEmpty()) {
             return;
         }
         final Map<TaskId, Set<TopicPartition>> newTasks = new HashMap<>();
         // collect newly assigned tasks and reopen re-assigned tasks
-        log.debug("Adding assigned tasks as active: {}", assignedTasks);
-        for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedTasks.entrySet()) {
+        log.debug("Adding assigned tasks as active: {}", assignedActiveTasks);
+        for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedActiveTasks.entrySet()) {
             final TaskId taskId = entry.getKey();
             final Set<TopicPartition> partitions = entry.getValue();
 
@@ -142,7 +158,7 @@ class TaskManager {
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     private void addStandbyTasks() {
-        final Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = threadMetadataProvider.standbyTasks();
+        final Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = this.assignedStandbyTasks;
         if (assignedStandbyTasks.isEmpty()) {
             return;
         }
@@ -184,6 +200,44 @@ class TaskManager {
     }
 
     /**
+     * Returns ids of tasks whose states are kept on the local storage.
+     */
+    Set<TaskId> cachedTasksIds() {
+        // A client could contain some inactive tasks whose states are still kept on the local storage in the following scenarios:
+        // 1) the client is actively maintaining standby tasks by maintaining their states from the change log.
+        // 2) the client has just got some tasks migrated out of itself to other clients while these task states
+        //    have not been cleaned up yet (this can happen in a rolling bounce upgrade, for example).
+
+        final HashSet<TaskId> tasks = new HashSet<>();
+
+        final File[] stateDirs = taskCreator.stateDirectory().listTaskDirectories();
+        if (stateDirs != null) {
+            for (final File dir : stateDirs) {
+                try {
+                    final TaskId id = TaskId.parse(dir.getName());
+                    // if the checkpoint file exists, the state is valid.
+                    if (new File(dir, ProcessorStateManager.CHECKPOINT_FILE_NAME).exists()) {
+                        tasks.add(id);
+                    }
+                } catch (final TaskIdFormatException e) {
+                    // there may be some unknown files that sits in the same directory,
+                    // we should ignore these files instead trying to delete them as well
+                }
+            }
+        }
+
+        return tasks;
+    }
+
+    UUID processId() {
+        return processId;
+    }
+
+    InternalTopologyBuilder builder() {
+        return taskCreator.builder();
+    }
+
+    /**
      * Similar to shutdownTasksAndState, however does not close the task managers, in the hope that
      * soon the tasks will be assigned again
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
@@ -216,16 +270,14 @@ class TaskManager {
             firstException.compareAndSet(null, fatalException);
         }
         standby.close(clean);
-        try {
-            threadMetadataProvider.close();
-        } catch (final Throwable e) {
-            log.error("Failed to close KafkaStreamClient due to the following error:", e);
-        }
+
         // remove the changelog partitions from restore consumer
         restoreConsumer.unsubscribe();
         taskCreator.close();
         standbyTaskCreator.close();
 
+        streamsKafkaClient.close();
+
         final RuntimeException fatalException = firstException.get();
         if (fatalException != null) {
             throw fatalException;
@@ -311,6 +363,45 @@ class TaskManager {
         }
     }
 
+    void setClusterMetadata(final Cluster cluster) {
+        this.cluster = cluster;
+    }
+
+    void setPartitionsByHostState(final Map<HostInfo, Set<TopicPartition>> partitionsByHostState) {
+        this.partitionsByHostState = partitionsByHostState;
+        this.streamsMetadataState.onChange(partitionsByHostState, cluster);
+    }
+
+    void setAssignmentMetadata(final Map<TaskId, Set<TopicPartition>> activeTasks,
+                               final Map<TaskId, Set<TopicPartition>> standbyTasks) {
+        this.assignedActiveTasks = activeTasks;
+        this.assignedStandbyTasks = standbyTasks;
+    }
+
+    void updateSubscriptionsFromAssignment(List<TopicPartition> partitions) {
+        if (builder().sourceTopicPattern() != null) {
+            final Set<String> assignedTopics = new HashSet<>();
+            for (final TopicPartition topicPartition : partitions) {
+                assignedTopics.add(topicPartition.topic());
+            }
+
+            final Collection<String> existingTopics = builder().subscriptionUpdates().getUpdates();
+            if (!existingTopics.containsAll(assignedTopics)) {
+                assignedTopics.addAll(existingTopics);
+                builder().updateSubscribedTopics(assignedTopics, logPrefix);
+            }
+        }
+    }
+
+    void updateSubscriptionsFromMetadata(Set<String> topics) {
+        if (builder().sourceTopicPattern() != null) {
+            final Collection<String> existingTopics = builder().subscriptionUpdates().getUpdates();
+            if (!existingTopics.equals(topics)) {
+                builder().updateSubscribedTopics(topics, logPrefix);
+            }
+        }
+    }
+
     /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
@@ -350,4 +441,13 @@ class TaskManager {
         builder.append(standby.toString(indent + "\t\t"));
         return builder.toString();
     }
+
+    // the following functions are for testing only
+    Map<TaskId, Set<TopicPartition>> assignedActiveTasks() {
+        return assignedActiveTasks;
+    }
+
+    Map<TaskId, Set<TopicPartition>> assignedStandbyTasks() {
+        return assignedStandbyTasks;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadDataProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadDataProvider.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadDataProvider.java
deleted file mode 100644
index ded98f7..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadDataProvider.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.PartitionGrouper;
-import org.apache.kafka.streams.processor.TaskId;
-
-import java.util.Set;
-import java.util.UUID;
-
-// interface to get info about the StreamThread
-interface ThreadDataProvider {
-    InternalTopologyBuilder builder();
-    String name();
-    Set<TaskId> prevActiveTasks();
-    Set<TaskId> cachedTasks();
-    UUID processId();
-    StreamsConfig config();
-    PartitionGrouper partitionGrouper();
-    void setThreadMetadataProvider(final ThreadMetadataProvider provider);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataProvider.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataProvider.java
deleted file mode 100644
index f185045..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataProvider.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.state.HostInfo;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Interface used by a <code>StreamThread</code> to get metadata from the <code>StreamPartitionAssignor</code>
- */
-public interface ThreadMetadataProvider {
-    Map<TaskId, Set<TopicPartition>> standbyTasks();
-    Map<TaskId, Set<TopicPartition>> activeTasks();
-    Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState();
-    Cluster clusterMetadata();
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 3774a8e..1a4cfb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -94,7 +94,7 @@ public class StreamsConfigTest {
     public void testGetConsumerConfigs() {
         final String groupId = "example-application";
         final String clientId = "client";
-        final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, groupId, clientId);
+        final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(groupId, clientId);
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer");
         assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), groupId);
         assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
@@ -147,7 +147,7 @@ public class StreamsConfigTest {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
         assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -166,7 +166,7 @@ public class StreamsConfigTest {
     public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
         assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
     }
 
@@ -202,7 +202,7 @@ public class StreamsConfigTest {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
         assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -248,7 +248,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
         assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
     }
@@ -275,7 +275,7 @@ public class StreamsConfigTest {
     public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "a", "b");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("a", "b");
         assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
 
@@ -290,7 +290,7 @@ public class StreamsConfigTest {
     @Test
     public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
         assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));
     }
 
@@ -319,7 +319,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
         assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
@@ -327,7 +327,7 @@ public class StreamsConfigTest {
     public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientrId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientrId");
         assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
@@ -371,7 +371,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
 
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
 
         assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 9c8244a..e9df495 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -262,12 +262,14 @@ public class QueryableStateIntegrationTest {
                 public boolean conditionMet() {
                     try {
                         final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
+
                         if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
                             return false;
                         }
                         final int index = metadata.hostInfo().port();
                         final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
                         final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+
                         return store != null && store.get(key) != null;
                     } catch (final IllegalStateException e) {
                         // Kafka Streams instance may have closed but rebalance hasn't happened

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 42e5ccf..d21d2e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.internals.InternalTopicManager;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -681,7 +681,7 @@ public class TopologyBuilderTest {
         builder.addSource("source-2", Pattern.compile("topic-[A-C]"));
         builder.addSource("source-3", Pattern.compile("topic-\\d"));
 
-        SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
         Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
         updatedTopicsField.setAccessible(true);
 
@@ -761,7 +761,7 @@ public class TopologyBuilderTest {
                 .addProcessor("my-processor", new MockProcessorSupplier(), "ingest")
                 .addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
 
-        final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+        final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
         final Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
         updatedTopicsField.setAccessible(true);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 418b0ba..2bd2d42 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -66,6 +66,7 @@ public class GlobalStreamThreadTest {
                                                     config,
                                                     mockConsumer,
                                                     new StateDirectory(config, time),
+                                                    0,
                                                     new Metrics(),
                                                     new MockTime(),
                                                     "clientId",
@@ -98,6 +99,7 @@ public class GlobalStreamThreadTest {
                                                     config,
                                                     mockConsumer,
                                                     new StateDirectory(config, time),
+                                                    0,
                                                     new Metrics(),
                                                     new MockTime(),
                                                     "clientId",

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 7d032a1..e914f9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -161,7 +161,7 @@ public class InternalTopicManagerTest {
         Map<String, Integer> replicationFactorPerTopic = new HashMap<>();
 
         MockStreamKafkaClient(final StreamsConfig streamsConfig) {
-            super(StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig),
+            super(StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig.originals()),
                   new MockClient(new MockTime()),
                   Collections.<MetricsReporter>emptyList(),
                   new LogContext());

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index fa83a71..0fdb575 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -633,7 +632,7 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source-2", null, null, null, Pattern.compile("topic-[A-C]"));
         builder.addSource(null, "source-3", null, null, null, Pattern.compile("topic-\\d"));
 
-        final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+        final InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates = new InternalTopologyBuilder.SubscriptionUpdates();
         final Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
         updatedTopicsField.setAccessible(true);
 
@@ -721,7 +720,7 @@ public class InternalTopologyBuilderTest {
         builder.addProcessor("my-processor", new MockProcessorSupplier(), "ingest");
         builder.addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
 
-        final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+        final InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates = new InternalTopologyBuilder.SubscriptionUpdates();
         final Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
         updatedTopicsField.setAccessible(true);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index cd37fab..99bb56d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -34,8 +34,6 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
-import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
@@ -45,7 +43,7 @@ import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
-import org.apache.kafka.test.MockTimestampExtractor;
+import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
@@ -57,7 +55,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
@@ -65,7 +62,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
 public class StreamPartitionAssignorTest {
@@ -110,43 +106,34 @@ public class StreamPartitionAssignorTest {
     private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
     private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
     private final StreamsConfig config = new StreamsConfig(configProps());
-    private final ThreadDataProvider threadDataProvider = EasyMock.createNiceMock(ThreadDataProvider.class);
-    private final Map<String, Object> configurationMap = new HashMap<>();
-    private final DefaultPartitionGrouper defaultPartitionGrouper = new DefaultPartitionGrouper();
-    private final SingleGroupPartitionGrouperStub stubPartitionGrouper = new SingleGroupPartitionGrouperStub();
     private final String userEndPoint = "localhost:8080";
+    private final String applicationId = "stream-partition-assignor-test";
 
-    private Properties configProps() {
-        return new Properties() {
-            {
-                setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test");
-                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint);
-                setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-                setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
-            }
-        };
+    private final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+
+    private Map<String, Object> configProps() {
+        Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint);
+        configurationMap.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
+        return configurationMap;
     }
 
-    private void configurePartitionAssignor(final int standbyReplicas, final String endPoint) {
-        configurationMap.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, threadDataProvider);
-        configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, standbyReplicas);
-        configurationMap.put(StreamsConfig.APPLICATION_SERVER_CONFIG, endPoint);
+    private void configurePartitionAssignor(final Map<String, Object> props) {
+        Map<String, Object> configurationMap = configProps();
+        configurationMap.putAll(props);
         partitionAssignor.configure(configurationMap);
     }
 
-    private void mockThreadDataProvider(final Set<TaskId> prevTasks,
-                                        final Set<TaskId> cachedTasks,
-                                        final UUID processId,
-                                        final PartitionGrouper partitionGrouper,
-                                        final InternalTopologyBuilder builder) throws NoSuchFieldException, IllegalAccessException {
-        EasyMock.expect(threadDataProvider.name()).andReturn("name").anyTimes();
-        EasyMock.expect(threadDataProvider.prevActiveTasks()).andReturn(prevTasks).anyTimes();
-        EasyMock.expect(threadDataProvider.cachedTasks()).andReturn(cachedTasks).anyTimes();
-        EasyMock.expect(threadDataProvider.config()).andReturn(config).anyTimes();
-        EasyMock.expect(threadDataProvider.builder()).andReturn(builder).anyTimes();
-        EasyMock.expect(threadDataProvider.processId()).andReturn(processId).anyTimes();
-        EasyMock.expect(threadDataProvider.partitionGrouper()).andReturn(partitionGrouper).anyTimes();
-        EasyMock.replay(threadDataProvider);
+    private void mockTaskManager(final Set<TaskId> prevTasks,
+                                 final Set<TaskId> cachedTasks,
+                                 final UUID processId,
+                                 final InternalTopologyBuilder builder) throws NoSuchFieldException, IllegalAccessException {
+        EasyMock.expect(taskManager.builder()).andReturn(builder).anyTimes();
+        EasyMock.expect(taskManager.prevActiveTaskIds()).andReturn(prevTasks).anyTimes();
+        EasyMock.expect(taskManager.cachedTasksIds()).andReturn(cachedTasks).anyTimes();
+        EasyMock.expect(taskManager.processId()).andReturn(processId).anyTimes();
+        EasyMock.replay(taskManager);
     }
 
 
@@ -163,9 +150,9 @@ public class StreamPartitionAssignorTest {
                 new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));
 
         final UUID processId = UUID.randomUUID();
-        mockThreadDataProvider(prevTasks, cachedTasks, processId, stubPartitionGrouper, builder);
+        mockTaskManager(prevTasks, cachedTasks, processId, builder);
 
-        configurePartitionAssignor(0, null);
+        configurePartitionAssignor(Collections.<String, Object>emptyMap());
         PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
 
         Collections.sort(subscription.topics());
@@ -197,8 +184,8 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
 
-        mockThreadDataProvider(prevTasks10, standbyTasks10, uuid1, stubPartitionGrouper, builder);
-        configurePartitionAssignor(0, null);
+        mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder);
+        configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
 
@@ -245,10 +232,6 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void testAssignWithPartialTopology() throws Exception {
-        Properties props = configProps();
-        props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class);
-        StreamsConfig config = new StreamsConfig(props);
-
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
         builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor1");
@@ -260,10 +243,10 @@ public class StreamPartitionAssignorTest {
 
         UUID uuid1 = UUID.randomUUID();
 
-        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, stubPartitionGrouper, builder);
-        configurePartitionAssignor(0, null);
+        mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
+        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, (Object) SingleGroupPartitionGrouperStub.class));
 
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(new StreamsConfig(configProps()), mockClientSupplier.restoreConsumer));
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
             new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
@@ -298,8 +281,8 @@ public class StreamPartitionAssignorTest {
             Collections.<String>emptySet());
         UUID uuid1 = UUID.randomUUID();
 
-        mockThreadDataProvider(prevTasks10, standbyTasks10, uuid1, stubPartitionGrouper, builder);
-        configurePartitionAssignor(0, null);
+        mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder);
+        configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -353,8 +336,9 @@ public class StreamPartitionAssignorTest {
 
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
-        mockThreadDataProvider(prevTasks10, Collections.<TaskId>emptySet(), uuid1, stubPartitionGrouper, builder);
-        configurePartitionAssignor(0, null);
+        mockTaskManager(prevTasks10, Collections.<TaskId>emptySet(), uuid1, builder);
+        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
@@ -392,7 +376,6 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void testAssignWithStates() throws Exception {
-        String applicationId = "test";
         builder.setApplicationId(applicationId);
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
@@ -417,11 +400,11 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
 
-        mockThreadDataProvider(Collections.<TaskId>emptySet(),
+        mockTaskManager(Collections.<TaskId>emptySet(),
                                Collections.<TaskId>emptySet(),
                                uuid1,
-                               defaultPartitionGrouper, builder);
-        configurePartitionAssignor(0, null);
+                builder);
+        configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
 
@@ -481,8 +464,8 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void testAssignWithStandbyReplicas() throws Exception {
-        Properties props = configProps();
-        props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
+        Map<String, Object> props = configProps();
+        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
         StreamsConfig config = new StreamsConfig(props);
 
         builder.addSource(null, "source1", null, null, null, "topic1");
@@ -502,9 +485,10 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
 
-        mockThreadDataProvider(prevTasks00, standbyTasks01, uuid1, defaultPartitionGrouper, builder);
+        mockTaskManager(prevTasks00, standbyTasks01, uuid1, builder);
+
+        configurePartitionAssignor(Collections.<String, Object>singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
 
-        configurePartitionAssignor(1, null);
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
@@ -552,35 +536,41 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void testOnAssignment() throws Exception {
-        TopicPartition t2p3 = new TopicPartition("topic2", 3);
-
-        builder.addSource(null, "source1", null, null, null, "topic1");
-        builder.addSource(null, "source2", null, null, null, "topic2");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-
-        UUID uuid = UUID.randomUUID();
-        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid, defaultPartitionGrouper, builder);
-        configurePartitionAssignor(0, null);
-
-        List<TaskId> activeTaskList = Utils.mkList(task0, task3);
-        Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
-        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-        activeTasks.put(task0, Utils.mkSet(t1p0));
-        activeTasks.put(task3, Utils.mkSet(t2p3));
-        standbyTasks.put(task1, Utils.mkSet(t1p0));
-        standbyTasks.put(task2, Utils.mkSet(t2p0));
+        configurePartitionAssignor(Collections.<String, Object>emptyMap());
+
+        final List<TaskId> activeTaskList = Utils.mkList(task0, task3);
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+        final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(
+                new HostInfo("localhost", 9090),
+                Utils.mkSet(t3p0, t3p3));
+        activeTasks.put(task0, Utils.mkSet(t3p0));
+        activeTasks.put(task3, Utils.mkSet(t3p3));
+        standbyTasks.put(task1, Utils.mkSet(t3p1));
+        standbyTasks.put(task2, Utils.mkSet(t3p2));
+
+        final AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, hostState);
+        final PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t3p0, t3p3), info.encode());
+
+        Capture<Cluster> capturedCluster = EasyMock.newCapture();
+        taskManager.setPartitionsByHostState(hostState);
+        EasyMock.expectLastCall();
+        taskManager.setAssignmentMetadata(activeTasks, standbyTasks);
+        EasyMock.expectLastCall();
+        taskManager.setClusterMetadata(EasyMock.capture(capturedCluster));
+        EasyMock.expectLastCall();
+        EasyMock.replay(taskManager);
 
-        AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
-        PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
         partitionAssignor.onAssignment(assignment);
 
-        assertEquals(activeTasks, partitionAssignor.activeTasks());
-        assertEquals(standbyTasks, partitionAssignor.standbyTasks());
+        EasyMock.verify(taskManager);
+
+        assertEquals(Collections.singleton(t3p0.topic()), capturedCluster.getValue().topics());
+        assertEquals(2, capturedCluster.getValue().partitionsForTopic(t3p0.topic()).size());
     }
 
     @Test
     public void testAssignWithInternalTopics() throws Exception {
-        String applicationId = "test";
         builder.setApplicationId(applicationId);
         builder.addInternalTopic("topicX");
         builder.addSource(null, "source1", null, null, null, "topic1");
@@ -588,12 +578,12 @@ public class StreamPartitionAssignorTest {
         builder.addSink("sink1", "topicX", null, null, null, "processor1");
         builder.addSource(null, "source2", null, null, null, "topicX");
         builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
-        List<String> topics = Utils.mkList("topic1", "test-topicX");
+        List<String> topics = Utils.mkList("topic1", applicationId + "-topicX");
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         UUID uuid1 = UUID.randomUUID();
-        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, defaultPartitionGrouper, builder);
-        configurePartitionAssignor(0, null);
+        mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
+        configurePartitionAssignor(Collections.<String, Object>emptyMap());
         MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
@@ -606,7 +596,7 @@ public class StreamPartitionAssignorTest {
 
         // check prepared internal topics
         assertEquals(1, internalTopicManager.readyTopics.size());
-        assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX"));
+        assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get(applicationId + "-topicX"));
     }
 
     @Test
@@ -626,9 +616,9 @@ public class StreamPartitionAssignorTest {
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         UUID uuid1 = UUID.randomUUID();
-        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, defaultPartitionGrouper, builder);
+        mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
 
-        configurePartitionAssignor(0, null);
+        configurePartitionAssignor(Collections.<String, Object>emptyMap());
         MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
@@ -646,18 +636,17 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
-        final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
         builder.addSource(null, "source", null, null, null, "input");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
         builder.addSink("sink", "output", null, null, null, "processor");
 
         final UUID uuid1 = UUID.randomUUID();
-        mockThreadDataProvider(Collections.<TaskId>emptySet(),
+        mockTaskManager(Collections.<TaskId>emptySet(),
                                Collections.<TaskId>emptySet(),
                                uuid1,
-                               defaultPartitionGrouper, builder);
-        configurePartitionAssignor(0, userEndPoint);
+                builder);
+        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) userEndPoint));
         final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
         final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
         assertEquals("localhost:8080", subscriptionInfo.userEndPoint);
@@ -665,7 +654,6 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void shouldMapUserEndPointToTopicPartitions() throws Exception {
-        final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
         builder.addSource(null, "source", null, null, null, "topic1");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
@@ -675,8 +663,9 @@ public class StreamPartitionAssignorTest {
 
         final UUID uuid1 = UUID.randomUUID();
 
-        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, defaultPartitionGrouper, builder);
-        configurePartitionAssignor(0, userEndPoint);
+        mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
+        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) userEndPoint));
+
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
@@ -695,15 +684,13 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
-        final String myEndPoint = "localhost";
-        final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
 
-        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), UUID.randomUUID(), defaultPartitionGrouper, builder);
+        mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), UUID.randomUUID(), builder);
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
 
         try {
-            configurePartitionAssignor(0, myEndPoint);
+            configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost"));
             Assert.fail("expected to an exception due to invalid config");
         } catch (ConfigException e) {
             // pass
@@ -712,12 +699,10 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
-        final String myEndPoint = "localhost:j87yhk";
-        final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
 
         try {
-            configurePartitionAssignor(0, myEndPoint);
+            configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost:j87yhk"));
             Assert.fail("expected to an exception due to invalid config");
         } catch (ConfigException e) {
             // pass
@@ -725,53 +710,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
-        List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
-        final Map<HostInfo, Set<TopicPartition>> hostState =
-                Collections.singletonMap(new HostInfo("localhost", 80),
-                        Collections.singleton(new TopicPartition("topic", 0)));
-        AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)),
-                Collections.<TaskId, Set<TopicPartition>>emptyMap(),
-                hostState);
-        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), UUID.randomUUID(), defaultPartitionGrouper, builder);
-        configurePartitionAssignor(0, null);
-
-        partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
-        assertEquals(hostState, partitionAssignor.getPartitionsByHostState());
-    }
-
-    @Test
-    public void shouldSetClusterMetadataOnAssignment() throws Exception {
-        final List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
-        final Map<HostInfo, Set<TopicPartition>> hostState =
-                Collections.singletonMap(new HostInfo("localhost", 80),
-                        Collections.singleton(new TopicPartition("topic", 0)));
-        final AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)),
-                Collections.<TaskId, Set<TopicPartition>>emptyMap(),
-                hostState);
-
-
-        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), UUID.randomUUID(), defaultPartitionGrouper, builder);
-        configurePartitionAssignor(0, null);
-        partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
-        final Cluster cluster = partitionAssignor.clusterMetadata();
-        final List<PartitionInfo> partitionInfos = cluster.partitionsForTopic("topic");
-        final PartitionInfo partitionInfo = partitionInfos.get(0);
-        assertEquals(1, partitionInfos.size());
-        assertEquals("topic", partitionInfo.topic());
-        assertEquals(0, partitionInfo.partition());
-    }
-
-    @Test
-    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() {
-        final Cluster cluster = partitionAssignor.clusterMetadata();
-        assertNotNull(cluster);
-    }
-
-    @Test
     public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() throws Exception {
-        final String applicationId = "application-id";
-
         final StreamsBuilder builder = new StreamsBuilder();
 
         final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
@@ -833,12 +772,11 @@ public class StreamPartitionAssignorTest {
         final UUID uuid = UUID.randomUUID();
         final String client = "client1";
 
-        mockThreadDataProvider(Collections.<TaskId>emptySet(),
+        mockTaskManager(Collections.<TaskId>emptySet(),
                                Collections.<TaskId>emptySet(),
                                UUID.randomUUID(),
-                               defaultPartitionGrouper,
-                               internalTopologyBuilder);
-        configurePartitionAssignor(0, null);
+                internalTopologyBuilder);
+        configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
         final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
             config,
@@ -874,53 +812,25 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldUpdatePartitionHostInfoMapOnAssignment() throws Exception {
+    public void shouldUpdateClusterMetadataAndHostInfoOnAssignment() throws Exception {
         final TopicPartition partitionOne = new TopicPartition("topic", 1);
         final TopicPartition partitionTwo = new TopicPartition("topic", 2);
-        final Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(
+        final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(
                 new HostInfo("localhost", 9090), Utils.mkSet(partitionOne, partitionTwo));
 
-        final Map<HostInfo, Set<TopicPartition>> secondHostState = new HashMap<>();
-        secondHostState.put(new HostInfo("localhost", 9090), Utils.mkSet(partitionOne));
-        secondHostState.put(new HostInfo("other", 9090), Utils.mkSet(partitionTwo));
+        configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
-        mockThreadDataProvider(Collections.<TaskId>emptySet(),
-                               Collections.<TaskId>emptySet(),
-                               UUID.randomUUID(),
-                               defaultPartitionGrouper,
-                               builder);
-        configurePartitionAssignor(0, null);
-        partitionAssignor.onAssignment(createAssignment(firstHostState));
-        assertEquals(firstHostState, partitionAssignor.getPartitionsByHostState());
-        partitionAssignor.onAssignment(createAssignment(secondHostState));
-        assertEquals(secondHostState, partitionAssignor.getPartitionsByHostState());
-    }
-
-    @Test
-    public void shouldUpdateClusterMetadataOnAssignment() throws Exception {
-        final TopicPartition topicOne = new TopicPartition("topic", 1);
-        final TopicPartition topicTwo = new TopicPartition("topic2", 2);
-        final Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(
-                new HostInfo("localhost", 9090), Utils.mkSet(topicOne));
+        taskManager.setPartitionsByHostState(hostState);
+        EasyMock.expectLastCall();
+        EasyMock.replay(taskManager);
 
-        final Map<HostInfo, Set<TopicPartition>> secondHostState = Collections.singletonMap(
-                new HostInfo("localhost", 9090), Utils.mkSet(topicOne, topicTwo));
+        partitionAssignor.onAssignment(createAssignment(hostState));
 
-        mockThreadDataProvider(Collections.<TaskId>emptySet(),
-                               Collections.<TaskId>emptySet(),
-                               UUID.randomUUID(),
-                               defaultPartitionGrouper,
-                               builder);
-        configurePartitionAssignor(0, null);
-        partitionAssignor.onAssignment(createAssignment(firstHostState));
-        assertEquals(Utils.mkSet("topic"), partitionAssignor.clusterMetadata().topics());
-        partitionAssignor.onAssignment(createAssignment(secondHostState));
-        assertEquals(Utils.mkSet("topic", "topic2"), partitionAssignor.clusterMetadata().topics());
+        EasyMock.verify(taskManager);
     }
 
     @Test
     public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
-        final String applicationId = "appId";
         final StreamsBuilder builder = new StreamsBuilder();
 
         final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
@@ -929,13 +839,15 @@ public class StreamPartitionAssignorTest {
         builder.stream("topic1").groupByKey().count();
 
         final UUID uuid = UUID.randomUUID();
-        mockThreadDataProvider(Collections.<TaskId>emptySet(),
+        mockTaskManager(Collections.<TaskId>emptySet(),
                                Collections.<TaskId>emptySet(),
                                uuid,
-                               defaultPartitionGrouper,
-                               internalTopologyBuilder);
+                internalTopologyBuilder);
 
-        configurePartitionAssignor(1, userEndPoint);
+        Map<String, Object> props = new HashMap<>();
+        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint);
+        configurePartitionAssignor(props);
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(
             config,
             mockClientSupplier.restoreConsumer));
@@ -979,7 +891,7 @@ public class StreamPartitionAssignorTest {
     public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProviderInstance() {
         final Map<String, Object> config = new HashMap<>();
         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
-        config.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, "i am not a stream thread");
+        config.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, "i am not a stream thread");
 
         partitionAssignor.configure(config);
     }
@@ -1035,5 +947,4 @@ public class StreamPartitionAssignorTest {
 
         return info;
     }
-
 }