You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/10 21:25:32 UTC

[1/2] kafka git commit: KAFKA-3914: Global discovery of state stores

Repository: kafka
Updated Branches:
  refs/heads/trunk caa9bd0fc -> 68b5d014f


http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 21de73a..9d261bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
@@ -31,10 +32,12 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -85,6 +88,7 @@ public class StreamPartitionAssignorTest {
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
     private final TaskId task3 = new TaskId(0, 3);
+    private String userEndPoint = null;
 
     private Properties configProps() {
         return new Properties() {
@@ -115,7 +119,7 @@ public class StreamPartitionAssignorTest {
 
         String clientId = "client-id";
         UUID processId = UUID.randomUUID();
-        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), new SystemTime()) {
+        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), new SystemTime(), new StreamsMetadataState(builder)) {
             @Override
             public Set<TaskId> prevTasks() {
                 return prevTasks;
@@ -137,7 +141,7 @@ public class StreamPartitionAssignorTest {
         Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
         standbyTasks.removeAll(prevTasks);
 
-        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks);
+        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null);
         assertEquals(info.encode(), subscription.userData());
     }
 
@@ -163,18 +167,18 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode()));
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -229,18 +233,18 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet(), userEndPoint).encode()));
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -297,18 +301,18 @@ public class StreamPartitionAssignorTest {
         String client1 = "client1";
 
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -352,18 +356,18 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode()));
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -454,7 +458,7 @@ public class StreamPartitionAssignorTest {
         UUID uuid = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid, new Metrics(), new SystemTime());
+        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
@@ -464,7 +468,7 @@ public class StreamPartitionAssignorTest {
         standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
         standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
 
-        AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks);
+        AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
         PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
         partitionAssignor.onAssignment(assignment);
 
@@ -493,7 +497,7 @@ public class StreamPartitionAssignorTest {
 
         MockClientSupplier clientSupplier = new MockClientSupplier();
 
-        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
@@ -503,7 +507,7 @@ public class StreamPartitionAssignorTest {
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
 
         partitionAssignor.assign(metadata, subscriptions);
 
@@ -535,7 +539,7 @@ public class StreamPartitionAssignorTest {
 
         MockClientSupplier clientSupplier = new MockClientSupplier();
 
-        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
@@ -545,7 +549,7 @@ public class StreamPartitionAssignorTest {
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
         subscriptions.put("consumer10",
-                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));
+                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -554,6 +558,139 @@ public class StreamPartitionAssignorTest {
         assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicZ"));
     }
 
+    @Test
+    public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
+        final Properties properties = configProps();
+        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080");
+        final StreamsConfig config = new StreamsConfig(properties);
+        final TopologyBuilder builder = new TopologyBuilder();
+        final String applicationId = "application-id";
+        builder.setApplicationId(applicationId);
+        builder.addSource("source", "input");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addSink("sink", "output", "processor");
+
+        final UUID uuid1 = UUID.randomUUID();
+        final String client1 = "client1";
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+        final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
+        final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
+        final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
+        assertEquals("localhost:8080", subscriptionInfo.userEndPoint);
+    }
+
+    @Test
+    public void shouldMapUserEndPointToTopicPartitions() throws Exception {
+        final Properties properties = configProps();
+        final String myEndPoint = "localhost:8080";
+        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
+        final StreamsConfig config = new StreamsConfig(properties);
+        final TopologyBuilder builder = new TopologyBuilder();
+        final String applicationId = "application-id";
+        builder.setApplicationId(applicationId);
+        builder.addSource("source", "topic1");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addSink("sink", "output", "processor");
+
+        final List<String> topics = Utils.mkList("topic1");
+
+        final UUID uuid1 = UUID.randomUUID();
+        final String client1 = "client1";
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+        final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
+
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+        subscriptions.put("consumer1",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, myEndPoint).encode()));
+
+        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
+        final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
+        final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHostState.get(new HostInfo("localhost", 8080));
+        assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
+                new TopicPartition("topic1", 1),
+                new TopicPartition("topic1", 2)), topicPartitions);
+    }
+
+    @Test
+    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
+        final Properties properties = configProps();
+        final String myEndPoint = "localhost";
+        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
+        final StreamsConfig config = new StreamsConfig(properties);
+        final UUID uuid1 = UUID.randomUUID();
+        final String client1 = "client1";
+        final TopologyBuilder builder = new TopologyBuilder();
+        final String applicationId = "application-id";
+        builder.setApplicationId(applicationId);
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+        final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1,
+                                                           new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+
+        try {
+            partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
+            Assert.fail("expected to an exception due to invalid config");
+        } catch (ConfigException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception {
+        final Properties properties = configProps();
+        final String myEndPoint = "localhost:j87yhk";
+        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
+        final StreamsConfig config = new StreamsConfig(properties);
+        final UUID uuid1 = UUID.randomUUID();
+        final String client1 = "client1";
+        final TopologyBuilder builder = new TopologyBuilder();
+        final String applicationId = "application-id";
+        builder.setApplicationId(applicationId);
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+        final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1,
+                                                           new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+
+        try {
+            partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
+            Assert.fail("expected to an exception due to invalid config");
+        } catch (ConfigException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic", 0));
+        final Map<HostInfo, Set<TopicPartition>> hostState =
+                Collections.singletonMap(new HostInfo("localhost", 80),
+                        Collections.singleton(new TopicPartition("topic", 0)));
+        AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)),
+                Collections.<TaskId, Set<TopicPartition>>emptyMap(),
+                hostState);
+        partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
+        assertEquals(hostState, partitionAssignor.getPartitionsByHostState());
+    }
+
     private class MockInternalTopicManager extends InternalTopicManager {
 
         public Map<String, Integer> readyTopics = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index d1aaa07..3a90ce3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -159,7 +159,7 @@ public class StreamThreadTest {
         builder.addSource("source3", "topic3");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
 
-        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), new SystemTime()) {
+        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), new SystemTime(), new StreamsMetadataState(builder)) {
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                 ProcessorTopology topology = builder.build("X", id.topicGroupId);
@@ -278,7 +278,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId,  processId, new Metrics(), mockTime, new StreamsMetadataState(builder)) {
                 @Override
                 public void maybeClean() {
                     super.maybeClean();
@@ -397,7 +397,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId,  processId, new Metrics(), mockTime, new StreamsMetadataState(builder)) {
                 @Override
                 public void maybeCommit() {
                     super.maybeCommit();
@@ -469,7 +469,7 @@ public class StreamThreadTest {
         StreamsConfig config = new StreamsConfig(configProps());
         MockClientSupplier clientSupplier = new MockClientSupplier();
         StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-                                               clientId,  processId, new Metrics(), new MockTime());
+                                               clientId,  processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
         assertSame(clientSupplier.producer, thread.producer);
         assertSame(clientSupplier.consumer, thread.consumer);
         assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
new file mode 100644
index 0000000..d110277
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.StreamsMetadata;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class StreamsMetadataStateTest {
+
+    private StreamsMetadataState discovery;
+    private HostInfo hostOne;
+    private HostInfo hostTwo;
+    private HostInfo hostThree;
+    private TopicPartition topic1P0;
+    private TopicPartition topic2P0;
+    private TopicPartition topic3P0;
+    private Map<HostInfo, Set<TopicPartition>> hostToPartitions;
+    private KStreamBuilder builder;
+    private TopicPartition topic1P1;
+    private TopicPartition topic2P1;
+    private TopicPartition topic4P0;
+    private List<PartitionInfo> partitionInfos;
+    private Cluster cluster;
+
+    @Before
+    public void before() {
+        builder = new KStreamBuilder();
+        final KStream<Object, Object> one = builder.stream("topic-one");
+        one.groupByKey().count("table-one");
+
+        final KStream<Object, Object> two = builder.stream("topic-two");
+        two.groupByKey().count("table-two");
+
+        builder.stream("topic-three")
+                .groupByKey()
+                .count("table-three");
+
+        builder.merge(one, two).groupByKey().count("merged-table");
+
+        builder.stream("topic-four").mapValues(new ValueMapper<Object, Object>() {
+            @Override
+            public Object apply(final Object value) {
+                return value;
+            }
+        });
+
+        builder.setApplicationId("appId");
+
+        topic1P0 = new TopicPartition("topic-one", 0);
+        topic1P1 = new TopicPartition("topic-one", 1);
+        topic2P0 = new TopicPartition("topic-two", 0);
+        topic2P1 = new TopicPartition("topic-two", 1);
+        topic3P0 = new TopicPartition("topic-three", 0);
+        topic4P0 = new TopicPartition("topic-four", 0);
+
+        hostOne = new HostInfo("host-one", 8080);
+        hostTwo = new HostInfo("host-two", 9090);
+        hostThree = new HostInfo("host-three", 7070);
+        hostToPartitions = new HashMap<>();
+        hostToPartitions.put(hostOne, Utils.mkSet(topic1P0, topic2P1, topic4P0));
+        hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1));
+        hostToPartitions.put(hostThree, Collections.singleton(topic3P0));
+
+        partitionInfos = Arrays.asList(
+                new PartitionInfo("topic-one", 0, null, null, null),
+                new PartitionInfo("topic-one", 1, null, null, null),
+                new PartitionInfo("topic-two", 0, null, null, null),
+                new PartitionInfo("topic-two", 1, null, null, null),
+                new PartitionInfo("topic-three", 0, null, null, null),
+                new PartitionInfo("topic-four", 0, null, null, null));
+
+        cluster = new Cluster(Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet());
+        discovery = new StreamsMetadataState(builder);
+        discovery.onChange(hostToPartitions, cluster);
+    }
+
+    @Test
+    public void shouldGetAllStreamInstances() throws Exception {
+        final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet("table-one", "table-two", "merged-table"),
+                Utils.mkSet(topic1P0, topic2P1, topic4P0));
+        final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-one", "merged-table"),
+                Utils.mkSet(topic2P0, topic1P1));
+        final StreamsMetadata three = new StreamsMetadata(hostThree, Collections.singleton("table-three"),
+                Collections.singleton(topic3P0));
+
+        Collection<StreamsMetadata> actual = discovery.getAllMetadata();
+        assertEquals(3, actual.size());
+        assertTrue("expected " + actual + " to contain " + one, actual.contains(one));
+        assertTrue("expected " + actual + " to contain " + two, actual.contains(two));
+        assertTrue("expected " + actual + " to contain " + three, actual.contains(three));
+    }
+
+    @Test
+    public void shouldGetAllStreamsInstancesWithNoStores() throws Exception {
+        builder.stream("topic-five").filter(new Predicate<Object, Object>() {
+            @Override
+            public boolean test(final Object key, final Object value) {
+                return true;
+            }
+        }).to("some-other-topic");
+
+        final TopicPartition tp5 = new TopicPartition("topic-five", 1);
+        final HostInfo hostFour = new HostInfo("host-four", 8080);
+        hostToPartitions.put(hostFour, Utils.mkSet(tp5));
+
+        discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, null, null, null))));
+
+        final StreamsMetadata expected = new StreamsMetadata(hostFour, Collections.<String>emptySet(),
+                Collections.singleton(tp5));
+        final Collection<StreamsMetadata> actual = discovery.getAllMetadata();
+        assertTrue("expected " + actual + " to contain " + expected, actual.contains(expected));
+    }
+
+    @Test
+    public void shouldGetInstancesForStoreName() throws Exception {
+        final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet("table-one", "table-two", "merged-table"),
+                Utils.mkSet(topic1P0, topic2P1, topic4P0));
+        final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-one", "merged-table"),
+                Utils.mkSet(topic2P0, topic1P1));
+        final Collection<StreamsMetadata> actual = discovery.getAllMetadataForStore("table-one");
+        assertEquals(2, actual.size());
+        assertTrue("expected " + actual + " to contain " + one, actual.contains(one));
+        assertTrue("expected " + actual + " to contain " + two, actual.contains(two));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore() throws Exception {
+        discovery.getAllMetadataForStore(null);
+    }
+
+    @Test
+    public void shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist() throws Exception {
+        final Collection<StreamsMetadata> actual = discovery.getAllMetadataForStore("not-a-store");
+        assertTrue(actual.isEmpty());
+    }
+
+    @Test
+    public void shouldGetInstanceWithKey() throws Exception {
+        final TopicPartition tp4 = new TopicPartition("topic-three", 1);
+        hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4));
+
+        discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
+
+        final StreamsMetadata expected = new StreamsMetadata(hostThree, Collections.singleton("table-three"),
+                Collections.singleton(topic3P0));
+
+        final StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key",
+                Serdes.String().serializer());
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void shouldGetInstanceWithKeyAndCustomPartitioner() throws Exception {
+        final TopicPartition tp4 = new TopicPartition("topic-three", 1);
+        hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4));
+
+        discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
+
+        final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-three", "merged-table"),
+                Utils.mkSet(topic2P0, tp4));
+
+        StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key", new StreamPartitioner<String, Object>() {
+            @Override
+            public Integer partition(final String key, final Object value, final int numPartitions) {
+                return 1;
+            }
+        });
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void shouldGetInstanceWithKeyWithMergedStreams() throws Exception {
+        final TopicPartition topic2P2 = new TopicPartition("topic-two", 2);
+        hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1, topic2P2));
+        discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null))));
+
+        final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-one", "merged-table"),
+                Utils.mkSet(topic2P0, topic1P1, topic2P2));
+
+        final StreamsMetadata actual = discovery.getMetadataWithKey("merged-table", "123", new StreamPartitioner<String, Object>() {
+            @Override
+            public Integer partition(final String key, final Object value, final int numPartitions) {
+                return 2;
+            }
+        });
+
+        assertEquals(expected, actual);
+
+    }
+
+    @Test
+    public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() throws Exception {
+        final StreamsMetadata actual = discovery.getMetadataWithKey("not-a-store",
+                "key",
+                Serdes.String().serializer());
+        assertNull(actual);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowWhenKeyIsNull() throws Exception {
+        discovery.getMetadataWithKey("table-three", null, Serdes.String().serializer());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowWhenSerializerIsNull() throws Exception {
+        discovery.getMetadataWithKey("table-three", "key", (Serializer) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfStoreNameIsNull() throws Exception {
+        discovery.getMetadataWithKey(null, "key", Serdes.String().serializer());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfStreamPartitionerIsNull() throws Exception {
+        discovery.getMetadataWithKey(null, "key", (StreamPartitioner) null);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
deleted file mode 100644
index 14a7f9a..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals.assignment;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.TaskId;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class AssginmentInfoTest {
-
-    @Test
-    public void testEncodeDecode() {
-        List<TaskId> activeTasks =
-                Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
-        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-
-        standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
-        standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
-
-        AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks);
-        AssignmentInfo decoded = AssignmentInfo.decode(info.encode());
-
-        assertEquals(info, decoded);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
new file mode 100644
index 0000000..ce94a23
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.HostInfo;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class AssignmentInfoTest {
+
+    @Test
+    public void testEncodeDecode() {
+        List<TaskId> activeTasks =
+                Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
+        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+        standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
+        standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
+
+        AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
+        AssignmentInfo decoded = AssignmentInfo.decode(info.encode());
+
+        assertEquals(info, decoded);
+    }
+
+    @Test
+    public void shouldDecodePreviousVersion() throws Exception {
+        List<TaskId> activeTasks =
+                Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
+        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+        standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
+        standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
+        final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, standbyTasks, null);
+        final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion));
+        assertEquals(oldVersion.activeTasks, decoded.activeTasks);
+        assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
+        assertEquals(0, decoded.partitionsByHostState.size()); // should be empty as wasn't in V1
+        assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
+    }
+
+
+    /**
+     * This is a clone of what the V1 encoding did. The encode method has changed for V2
+     * so it is impossible to test compatibility without having this
+     */
+    private ByteBuffer encodeV1(AssignmentInfo oldVersion) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream out = new DataOutputStream(baos);
+        // Encode version
+        out.writeInt(oldVersion.version);
+        // Encode active tasks
+        out.writeInt(oldVersion.activeTasks.size());
+        for (TaskId id : oldVersion.activeTasks) {
+            id.writeTo(out);
+        }
+        // Encode standby tasks
+        out.writeInt(oldVersion.standbyTasks.size());
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : oldVersion.standbyTasks.entrySet()) {
+            TaskId id = entry.getKey();
+            id.writeTo(out);
+
+            Set<TopicPartition> partitions = entry.getValue();
+            out.writeInt(partitions.size());
+            for (TopicPartition partition : partitions) {
+                out.writeUTF(partition.topic());
+                out.writeInt(partition.partition());
+            }
+        }
+
+        out.flush();
+        out.close();
+
+        return ByteBuffer.wrap(baos.toByteArray());
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 3119bee..cf6d8c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -20,12 +20,15 @@ package org.apache.kafka.streams.processor.internals.assignment;
 import org.apache.kafka.streams.processor.TaskId;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class SubscriptionInfoTest {
 
@@ -38,10 +41,62 @@ public class SubscriptionInfoTest {
         Set<TaskId> standbyTasks =
                 new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
 
-        SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks);
+        SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks, null);
         SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
 
         assertEquals(info, decoded);
     }
 
+    @Test
+    public void shouldEncodeDecodeWithUserEndPoint() throws Exception {
+        SubscriptionInfo original = new SubscriptionInfo(UUID.randomUUID(),
+                Collections.singleton(new TaskId(0, 0)), Collections.<TaskId>emptySet(), "localhost:80");
+        SubscriptionInfo decoded = SubscriptionInfo.decode(original.encode());
+        assertEquals(original, decoded);
+    }
+
+    @Test
+    public void shouldBeBackwardCompatible() throws Exception {
+        UUID processId = UUID.randomUUID();
+
+        Set<TaskId> activeTasks =
+                new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)));
+        Set<TaskId> standbyTasks =
+                new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
+
+        final ByteBuffer v1Encoding = encodePreviousVersion(processId, activeTasks, standbyTasks);
+        final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding);
+        assertEquals(activeTasks, decode.prevTasks);
+        assertEquals(standbyTasks, decode.standbyTasks);
+        assertEquals(processId, decode.processId);
+        assertNull(decode.userEndPoint);
+
+    }
+
+
+    /**
+     * This is a clone of what the V1 encoding did. The encode method has changed for V2
+     * so it is impossible to test compatibility without having this
+     */
+    private ByteBuffer encodePreviousVersion(UUID processId,  Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
+        ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
+        // version
+        buf.putInt(1);
+        // encode client UUID
+        buf.putLong(processId.getMostSignificantBits());
+        buf.putLong(processId.getLeastSignificantBits());
+        // encode ids of previously running tasks
+        buf.putInt(prevTasks.size());
+        for (TaskId id : prevTasks) {
+            id.writeTo(buf);
+        }
+        // encode ids of cached tasks
+        buf.putInt(standbyTasks.size());
+        for (TaskId id : standbyTasks) {
+            id.writeTo(buf);
+        }
+        buf.rewind();
+
+        return buf;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index a112a5a..0416e40 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@@ -66,7 +67,7 @@ public class StreamThreadStateStoreProviderTest {
     public void before() throws IOException {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("the-source", "the-source");
-        builder.addProcessor("the-processor", new MockProcessorSupplier());
+        builder.addProcessor("the-processor", new MockProcessorSupplier(), "the-source");
         builder.addStateStore(Stores.create("kv-store")
                                   .withStringKeys()
                                   .withStringValues().inMemory().build(), "the-processor");
@@ -106,7 +107,7 @@ public class StreamThreadStateStoreProviderTest {
         thread = new StreamThread(builder, streamsConfig, clientSupplier,
                                   applicationId,
                                   "clientId", UUID.randomUUID(), new Metrics(),
-                                  new SystemTime()) {
+                                  new SystemTime(), new StreamsMetadataState(builder)) {
             @Override
             public Map<TaskId, StreamTask> tasks() {
                 return tasks;


[2/2] kafka git commit: KAFKA-3914: Global discovery of state stores

Posted by gu...@apache.org.
KAFKA-3914: Global discovery of state stores

guozhangwang enothereska mjsax miguno  please take a look. A few things that need to be clarified

1. I've added StreamsConfig.USER_ENDPOINT_CONFIG, but should we have separate configs for host and port or is this one config ok?
2. `HostState` in the KIP has a byte[] field - not sure why and what it would be populated with
3. I've changed the API to return `List<KafkaStreamsInstance>` as opposed to `Map<HostInfo, Set<TaskMetadata>>` as i find this far more intuitive to work with.

Author: Damian Guy <da...@gmail.com>

Reviewers: Matthias J. Sax, Michael G. Noll, Eno Thereska, Guozhang Wang

Closes #1576 from dguy/kafka-3914v2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68b5d014
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68b5d014
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68b5d014

Branch: refs/heads/trunk
Commit: 68b5d014fa3bf2e18da253ded2bbcd4f5d4a9d8d
Parents: caa9bd0
Author: Damian Guy <da...@gmail.com>
Authored: Wed Aug 10 14:25:23 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Aug 10 14:25:23 2016 -0700

----------------------------------------------------------------------
 .../producer/internals/DefaultPartitioner.java  |  22 +-
 .../org/apache/kafka/common/utils/Utils.java    |  16 ++
 .../org/apache/kafka/streams/KafkaStreams.java  |  94 ++++++-
 .../org/apache/kafka/streams/StreamsConfig.java |  10 +
 .../internals/WindowedStreamPartitioner.java    |   5 +-
 .../streams/processor/TopologyBuilder.java      |  55 +++-
 .../internals/DefaultStreamPartitioner.java     |  43 +++
 .../internals/StreamPartitionAssignor.java      | 103 +++++++-
 .../processor/internals/StreamThread.java       |  10 +-
 .../internals/StreamsMetadataState.java         | 237 +++++++++++++++++
 .../internals/assignment/AssignmentInfo.java    | 155 ++++++-----
 .../internals/assignment/SubscriptionInfo.java  |  91 ++++---
 .../apache/kafka/streams/state/HostInfo.java    |  81 ++++++
 .../kafka/streams/state/StreamsMetadata.java    |  93 +++++++
 .../apache/kafka/streams/KafkaStreamsTest.java  |  43 ++-
 .../integration/RegexSourceIntegrationTest.java |   3 +-
 .../streams/kstream/KStreamBuilderTest.java     |  51 ++++
 .../streams/processor/TopologyBuilderTest.java  |  35 +++
 .../internals/StreamPartitionAssignorTest.java  | 185 +++++++++++--
 .../processor/internals/StreamThreadTest.java   |   8 +-
 .../internals/StreamsMetadataStateTest.java     | 262 +++++++++++++++++++
 .../assignment/AssginmentInfoTest.java          |  50 ----
 .../assignment/AssignmentInfoTest.java          | 107 ++++++++
 .../assignment/SubscriptionInfoTest.java        |  57 +++-
 .../StreamThreadStateStoreProviderTest.java     |   5 +-
 25 files changed, 1599 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
index f81c496..241e809 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
@@ -37,22 +37,6 @@ public class DefaultPartitioner implements Partitioner {
 
     private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
 
-    /**
-     * A cheap way to deterministically convert a number to a positive value. When the input is
-     * positive, the original value is returned. When the input number is negative, the returned
-     * positive value is the original value bit AND against 0x7fffffff which is not its absolutely
-     * value.
-     *
-     * Note: changing this method in the future will possibly cause partition selection not to be
-     * compatible with the existing messages already placed on a partition.
-     *
-     * @param number a given number
-     * @return a positive number.
-     */
-    private static int toPositive(int number) {
-        return number & 0x7fffffff;
-    }
-
     public void configure(Map<String, ?> configs) {}
 
     /**
@@ -72,15 +56,15 @@ public class DefaultPartitioner implements Partitioner {
             int nextValue = counter.getAndIncrement();
             List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
             if (availablePartitions.size() > 0) {
-                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
+                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                 return availablePartitions.get(part).partition();
             } else {
                 // no partitions are available, give a non-available partition
-                return DefaultPartitioner.toPositive(nextValue) % numPartitions;
+                return Utils.toPositive(nextValue) % numPartitions;
             }
         } else {
             // hash the keyBytes to choose a partition
-            return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index e740618..4629baf 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -699,4 +699,20 @@ public class Utils {
             throw exception;
     }
 
+    /**
+     * A cheap way to deterministically convert a number to a positive value. When the input is
+     * positive, the original value is returned. When the input number is negative, the returned
+     * positive value is the original value bit AND against 0x7fffffff which is not its absolutely
+     * value.
+     *
+     * Note: changing this method in the future will possibly cause partition selection not to be
+     * compatible with the existing messages already placed on a partition since it is used
+     * in producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}
+     *
+     * @param number a given number
+     * @return a positive number.
+     */
+    public static int toPositive(int number) {
+        return number & 0x7fffffff;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b9553c9..4fabdf7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -22,28 +22,34 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.state.StreamsMetadata;
 import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.internals.StateStoreProvider;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
  * sends output to zero or more output topics.
@@ -104,6 +110,7 @@ public class KafkaStreams {
     // of the co-location of stream thread's consumers. It is for internal
     // usage only and should not be exposed to users at all.
     private final UUID processId;
+    private StreamsMetadataState streamsMetadataState;
 
     private final StreamsConfig config;
 
@@ -164,11 +171,19 @@ public class KafkaStreams {
 
         this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
         final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
+        streamsMetadataState = new StreamsMetadataState(builder);
         for (int i = 0; i < this.threads.length; i++) {
-            this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, this.processId, this.metrics, time);
+            this.threads[i] = new StreamThread(builder,
+                                               config,
+                                               clientSupplier,
+                                               applicationId,
+                                               clientId,
+                                               processId,
+                                               metrics,
+                                               time,
+                                               streamsMetadataState);
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
-
         this.queryableStoreProvider = new QueryableStoreProvider(storeProviders);
     }
 
@@ -274,6 +289,77 @@ public class KafkaStreams {
             thread.setUncaughtExceptionHandler(eh);
     }
 
+
+    /**
+     * Find all of the instances of {@link StreamsMetadata} in the {@link KafkaStreams} application that this instance belongs to
+     *
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     * @return collection containing all instances of {@link StreamsMetadata} in the {@link KafkaStreams} application that this instance belongs to
+     */
+    public Collection<StreamsMetadata> allMetadata() {
+        validateIsRunning();
+        return streamsMetadataState.getAllMetadata();
+    }
+
+
+    /**
+     * Find instances of {@link StreamsMetadata} that contains the given storeName
+     *
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     * @param storeName the storeName to find metadata for
+     * @return  A collection containing instances of {@link StreamsMetadata} that have the provided storeName
+     */
+    public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
+        validateIsRunning();
+        return streamsMetadataState.getAllMetadataForStore(storeName);
+    }
+
+    /**
+     * Find the {@link StreamsMetadata} instance that contains the given storeName
+     * and the corresponding hosted store instance contains the given key. This will use
+     * the {@link org.apache.kafka.streams.processor.internals.DefaultStreamPartitioner} to
+     * locate the partition. If a custom partitioner has been used please use
+     * {@link KafkaStreams#metadataForKey(String, Object, StreamPartitioner)}
+     *
+     * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
+     * this method provides a way of finding which host it would exist on.
+     *
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     * @param storeName         Name of the store
+     * @param key               Key to use to for partition
+     * @param keySerializer     Serializer for the key
+     * @param <K>               key type
+     * @return  The {@link StreamsMetadata} for the storeName and key
+     */
+    public <K> StreamsMetadata metadataForKey(final String storeName,
+                                              final K key,
+                                              final Serializer<K> keySerializer) {
+        validateIsRunning();
+        return streamsMetadataState.getMetadataWithKey(storeName, key, keySerializer);
+    }
+
+    /**
+     * Find the {@link StreamsMetadata} instance that contains the given storeName
+     * and the corresponding hosted store instance contains the given key
+     *
+     * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
+     * this method provides a way of finding which host it would exist on.
+     *
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     * @param storeName         Name of the store
+     * @param key               Key to use to for partition
+     * @param partitioner       Partitioner for the store
+     * @param <K>               key type
+     * @return  The {@link StreamsMetadata} for the storeName and key
+     */
+    public <K> StreamsMetadata metadataForKey(final String storeName,
+                                              final K key,
+                                              final StreamPartitioner<K, ?> partitioner) {
+        validateIsRunning();
+        return streamsMetadataState.getMetadataWithKey(storeName, key, partitioner);
+    }
+
+
     /**
      * Get a facade wrapping the {@link org.apache.kafka.streams.processor.StateStore} instances
      * with the provided storeName and accepted by {@link QueryableStoreType#accepts(StateStore)}.

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b624e0e..41498cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -109,6 +109,10 @@ public class StreamsConfig extends AbstractConfig {
     public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde";
     public static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface.";
 
+    /**<code>user.endpoint</code> */
+    public static final String APPLICATION_SERVER_CONFIG = "application.server";
+    public static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application";
+
     /** <code>metrics.sample.window.ms</code> */
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
 
@@ -225,6 +229,11 @@ public class StreamsConfig extends AbstractConfig {
                                         atLeast(1),
                                         Importance.LOW,
                                         CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                                .define(APPLICATION_SERVER_CONFIG,
+                                        Type.STRING,
+                                        "",
+                                        Importance.LOW,
+                                        APPLICATION_SERVER_DOC)
                                 .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
                                         Type.CLASS,
                                         null,
@@ -319,6 +328,7 @@ public class StreamsConfig extends AbstractConfig {
         if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals(""))
             props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG));
 
+        props.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index 1e30864..ba9873b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
+import static org.apache.kafka.common.utils.Utils.toPositive;
+
 public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
 
     private final WindowedSerializer<K> serializer;
@@ -45,8 +47,5 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window
         return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
     }
 
-    private static int toPositive(int number) {
-        return number & 0x7fffffff;
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 8e3dc7a..7b79236 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -68,6 +68,7 @@ public class TopologyBuilder {
     private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
     private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
     private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
+    private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>();
     private final HashMap<String, String> sourceStoreToSourceTopic = new HashMap<>();
     private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
     private String applicationId;
@@ -616,12 +617,40 @@ public class TopologyBuilder {
 
         NodeFactory nodeFactory = nodeFactories.get(processorName);
         if (nodeFactory instanceof ProcessorNodeFactory) {
-            ((ProcessorNodeFactory) nodeFactory).addStateStore(stateStoreName);
+            ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
+            processorNodeFactory.addStateStore(stateStoreName);
+            connectStateStoreNameToSourceTopics(stateStoreName, processorNodeFactory);
         } else {
             throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
         }
     }
 
+
+    private Set<String> findSourceTopicsForProcessorParents(String [] parents) {
+        final Set<String> sourceTopics = new HashSet<>();
+        for (String parent : parents) {
+            NodeFactory nodeFactory = nodeFactories.get(parent);
+            if (nodeFactory instanceof SourceNodeFactory) {
+                sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) nodeFactory).getTopics()));
+            } else if (nodeFactory instanceof ProcessorNodeFactory) {
+                sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) nodeFactory).parents));
+            }
+        }
+        return sourceTopics;
+    }
+
+    private void connectStateStoreNameToSourceTopics(final String stateStoreName,
+                                                     final ProcessorNodeFactory processorNodeFactory) {
+
+        final Set<String> sourceTopics = findSourceTopicsForProcessorParents(processorNodeFactory.parents);
+        if (sourceTopics.isEmpty()) {
+            throw new TopologyBuilderException("can't find source topic for state store " +
+                    stateStoreName);
+        }
+        stateStoreNameToSourceTopics.put(stateStoreName,
+                Collections.unmodifiableSet(sourceTopics));
+    }
+
     /**
      * Returns the map of topic groups keyed by the group id.
      * A topic group is a group of topics in the same task.
@@ -875,20 +904,25 @@ public class TopologyBuilder {
      * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
      */
     public synchronized Set<String> sourceTopics() {
+        Set<String> topics = maybeDecorateInternalSourceTopics(sourceTopicNames);
+        return Collections.unmodifiableSet(topics);
+    }
+
+    private Set<String> maybeDecorateInternalSourceTopics(final Set<String> sourceTopicNames) {
         Set<String> topics = new HashSet<>();
         for (String topic : sourceTopicNames) {
             if (internalTopicNames.contains(topic)) {
                 if (applicationId == null) {
                     throw new TopologyBuilderException("there are internal topics and "
                                                        + "applicationId is null. Call "
-                                                       + "setApplicationId before sourceTopics");
+                                                       + "setApplicationId first");
                 }
                 topics.add(applicationId + "-" + topic);
             } else {
                 topics.add(topic);
             }
         }
-        return Collections.unmodifiableSet(topics);
+        return topics;
     }
 
     public synchronized Pattern sourceTopicPattern() {
@@ -917,7 +951,8 @@ public class TopologyBuilder {
 
     /**
      * Set the applicationId. This is required before calling
-     * {@link #sourceTopics}, {@link #topicGroups} and {@link #copartitionSources}
+     * {@link #sourceTopics}, {@link #topicGroups}, {@link #copartitionSources}, and
+     * {@link #stateStoreNameToSourceTopics}
      * @param applicationId   the streams applicationId. Should be the same as set by
      * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
      */
@@ -925,4 +960,16 @@ public class TopologyBuilder {
         Objects.requireNonNull(applicationId, "applicationId can't be null");
         this.applicationId = applicationId;
     }
+
+    /**
+     * @return a mapping from state store name to a Set of source Topics.
+     */
+    public Map<String, Set<String>> stateStoreNameToSourceTopics() {
+        final Map<String, Set<String>> results = new HashMap<>();
+        for (Map.Entry<String, Set<String>> entry : stateStoreNameToSourceTopics.entrySet()) {
+            results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue()));
+
+        }
+        return results;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
new file mode 100644
index 0000000..006f010
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> {
+
+    private final Serializer<K> keySerializer;
+    private final Cluster cluster;
+    private final String topic;
+    private final DefaultPartitioner defaultPartitioner;
+
+    public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster, final String topic) {
+        this.keySerializer = keySerializer;
+        this.cluster = cluster;
+        this.topic = topic;
+        this.defaultPartitioner = new DefaultPartitioner();
+    }
+
+    @Override
+    public Integer partition(final K key, final V value, final int numPartitions) {
+        final byte[] keyBytes = keySerializer.serialize(topic, key);
+        return defaultPartitioner.partition(topic, key, keyBytes, value, null, cluster);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 4b52511..fd70a01 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -24,7 +24,9 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.TaskId;
@@ -33,7 +35,7 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.HostInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +55,10 @@ import java.util.UUID;
 public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
 
     private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
+    private String userEndPointConfig;
+    private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
+    private Cluster metadataWithInternalTopics;
+
 
     private static class AssignedPartition implements Comparable<AssignedPartition> {
         public final TaskId taskId;
@@ -119,6 +125,25 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         streamThread = (StreamThread) o;
         streamThread.partitionAssignor(this);
 
+        String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
+        if (userEndPoint != null && !userEndPoint.isEmpty()) {
+            final String[] hostPort = userEndPoint.split(":");
+            if (hostPort.length != 2) {
+                throw new ConfigException(String.format("Config %s isn't in the correct format. Expected a host:port pair" +
+                                                       " but received %s",
+                                                        StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+            } else {
+                try {
+                    Integer.valueOf(hostPort[1]);
+                    this.userEndPointConfig = userEndPoint;
+                } catch (NumberFormatException nfe) {
+                    throw new ConfigException(String.format("Invalid port %s supplied in %s for config %s",
+                                                           hostPort[1], userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
+                }
+            }
+
+        }
+
         if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
             internalTopicManager = new InternalTopicManager(
                     (String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG),
@@ -143,7 +168,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         Set<TaskId> prevTasks = streamThread.prevTasks();
         Set<TaskId> standbyTasks = streamThread.cachedTasks();
         standbyTasks.removeAll(prevTasks);
-        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
+        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPointConfig);
 
         return new Subscription(new ArrayList<>(topics), data.encode());
     }
@@ -228,6 +253,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         Map<UUID, Set<String>> consumersByClient = new HashMap<>();
         Map<UUID, ClientState<TaskId>> states = new HashMap<>();
         SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+        Map<UUID, HostInfo> consumerEndPointMap = new HashMap<>();
         // decode subscription info
         for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
             String consumerId = entry.getKey();
@@ -239,7 +265,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
 
             SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
-
+            if (info.userEndPoint != null) {
+                final String[] hostPort = info.userEndPoint.split(":");
+                consumerEndPointMap.put(info.processId, new HostInfo(hostPort[0], Integer.valueOf(hostPort[1])));
+            }
             Set<String> consumers = consumersByClient.get(info.processId);
             if (consumers == null) {
                 consumers = new HashSet<>();
@@ -327,7 +356,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false, false);
         internalSourceTopicToTaskIds.clear();
 
-        Cluster metadataWithInternalTopics = metadata;
+        metadataWithInternalTopics = metadata;
         if (internalTopicManager != null)
             metadataWithInternalTopics = metadata.withPartitions(internalPartitionInfos);
 
@@ -361,8 +390,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         // assign tasks to clients
         states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
-        Map<String, Assignment> assignment = new HashMap<>();
 
+        final List<AssignmentSupplier> assignmentSuppliers = new ArrayList<>();
+
+        final Map<HostInfo, Set<TopicPartition>> endPointMap = new HashMap<>();
         for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
             UUID processId = entry.getKey();
             Set<String> consumers = entry.getValue();
@@ -408,14 +439,24 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 for (AssignedPartition partition : assignedPartitions) {
                     active.add(partition.taskId);
                     activePartitions.add(partition.partition);
+                    HostInfo hostInfo = consumerEndPointMap.get(processId);
+                    if (hostInfo != null) {
+                        if (!endPointMap.containsKey(hostInfo)) {
+                            endPointMap.put(hostInfo, new HashSet<TopicPartition>());
+                        }
+                        final Set<TopicPartition> topicPartitions = endPointMap.get(hostInfo);
+                        topicPartitions.add(partition.partition);
+                    }
                 }
 
-                AssignmentInfo data = new AssignmentInfo(active, standby);
-                assignment.put(consumer, new Assignment(activePartitions, data.encode()));
-                i++;
 
-                active.clear();
-                standby.clear();
+                assignmentSuppliers.add(new AssignmentSupplier(consumer,
+                                                               active,
+                                                               standby,
+                                                               endPointMap,
+                                                               activePartitions));
+
+                i++;
             }
         }
 
@@ -424,9 +465,39 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         // change log topics should be compacted
         prepareTopic(stateChangelogTopicToTaskIds, true /* compactTopic */, true);
 
+        Map<String, Assignment> assignment = new HashMap<>();
+        for (AssignmentSupplier assignmentSupplier : assignmentSuppliers) {
+            assignment.put(assignmentSupplier.consumer, assignmentSupplier.get());
+        }
         return assignment;
     }
 
+    class AssignmentSupplier {
+        private final String consumer;
+        private final List<TaskId> active;
+        private final Map<TaskId, Set<TopicPartition>> standby;
+        private final Map<HostInfo, Set<TopicPartition>> endPointMap;
+        private final List<TopicPartition> activePartitions;
+
+        AssignmentSupplier(final String consumer,
+                           final List<TaskId> active,
+                           final Map<TaskId, Set<TopicPartition>> standby,
+                           final Map<HostInfo, Set<TopicPartition>> endPointMap,
+                           final List<TopicPartition> activePartitions) {
+            this.consumer = consumer;
+            this.active = active;
+            this.standby = standby;
+            this.endPointMap = endPointMap;
+            this.activePartitions = activePartitions;
+        }
+
+        Assignment get() {
+            return new Assignment(activePartitions, new AssignmentInfo(active,
+                                                                       standby,
+                                                                       endPointMap).encode());
+        }
+    }
+
     /**
      * @throws TaskAssignmentException if there is no task id for one of the partitions specified
      */
@@ -460,6 +531,18 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
         }
         this.partitionToTaskIds = partitionToTaskIds;
+        this.partitionsByHostState = info.partitionsByHostState;
+    }
+
+    public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
+        if (partitionsByHostState == null) {
+            return Collections.emptyMap();
+        }
+        return Collections.unmodifiableMap(partitionsByHostState);
+    }
+
+    public Cluster clusterMetadata() {
+        return metadataWithInternalTopics;
     }
 
     private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<Integer, Set<String>> internalTopicGroups, Cluster metadata) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c84cae0..f416443 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.HostInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,6 +70,7 @@ public class StreamThread extends Thread {
     private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
 
     public final PartitionGrouper partitionGrouper;
+    private final StreamsMetadataState streamsMetadataState;
     public final String applicationId;
     public final String clientId;
     public final UUID processId;
@@ -110,6 +112,7 @@ public class StreamThread extends Thread {
                 addStreamTasks(assignment);
                 addStandbyTasks();
                 lastClean = time.milliseconds(); // start the cleaning cycle
+                streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata());
             } catch (Throwable t) {
                 rebalanceException = t;
                 throw t;
@@ -127,6 +130,7 @@ public class StreamThread extends Thread {
             } finally {
                 // TODO: right now upon partition revocation, we always remove all the tasks;
                 // this behavior can be optimized to only remove affected tasks in the future
+                streamsMetadataState.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), partitionAssignor.clusterMetadata());
                 removeStreamTasks();
                 removeStandbyTasks();
             }
@@ -140,7 +144,8 @@ public class StreamThread extends Thread {
                         String clientId,
                         UUID processId,
                         Metrics metrics,
-                        Time time) {
+                        Time time,
+                        StreamsMetadataState streamsMetadataState) {
         super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
 
         this.applicationId = applicationId;
@@ -151,6 +156,7 @@ public class StreamThread extends Thread {
         this.clientId = clientId;
         this.processId = processId;
         this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
+        this.streamsMetadataState = streamsMetadataState;
 
         // set the producer and consumer clients
         String threadName = getName();
@@ -500,6 +506,8 @@ public class StreamThread extends Thread {
         return tasks;
     }
 
+
+
     protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
         sensors.taskCreationSensor.record();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
new file mode 100644
index 0000000..eeb3bc9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.StreamsMetadata;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+
+/**
+ * Provides access to the {@link StreamsMetadata} in a KafkaStreams application. This can be used
+ * to discover the locations of {@link org.apache.kafka.streams.processor.StateStore}s
+ * in a KafkaStreams application
+ */
+public class StreamsMetadataState {
+    private final TopologyBuilder builder;
+    private final List<StreamsMetadata> allMetadata = new ArrayList<>();
+    private Cluster clusterMetadata;
+
+    public StreamsMetadataState(final TopologyBuilder builder) {
+        this.builder = builder;
+    }
+
+    /**
+     * Find all of the {@link StreamsMetadata}s in a
+     * {@link KafkaStreams application}
+     *
+     * @return all the {@link StreamsMetadata}s in a {@link KafkaStreams} application
+     */
+    public synchronized Collection<StreamsMetadata> getAllMetadata() {
+        return allMetadata;
+    }
+    
+    /**
+     * Find all of the {@link StreamsMetadata}s for a given storeName
+     *
+     * @param storeName the storeName to find metadata for
+     * @return A collection of {@link StreamsMetadata} that have the provided storeName
+     */
+    public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final String storeName) {
+        Objects.requireNonNull(storeName, "storeName cannot be null");
+
+        final Set<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName);
+        if (sourceTopics == null) {
+            return Collections.emptyList();
+        }
+
+        final ArrayList<StreamsMetadata> results = new ArrayList<>();
+        for (StreamsMetadata metadata : allMetadata) {
+            if (metadata.stateStoreNames().contains(storeName)) {
+                results.add(metadata);
+            }
+        }
+        return results;
+    }
+
+    /**
+     * Find the {@link StreamsMetadata}s for a given storeName and key. This method will use the
+     * {@link DefaultStreamPartitioner} to locate the store. If a custom partitioner has been used
+     * please use {@link StreamsMetadataState#getMetadataWithKey(String, Object, StreamPartitioner)}
+     *
+     * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
+     * this method provides a way of finding which {@link StreamsMetadata} it would exist on.
+     *
+     * @param storeName     Name of the store
+     * @param key           Key to use
+     * @param keySerializer Serializer for the key
+     * @param <K>           key type
+     * @return The {@link StreamsMetadata} for the storeName and key
+     */
+    public synchronized <K> StreamsMetadata getMetadataWithKey(final String storeName,
+                                                               final K key,
+                                                               final Serializer<K> keySerializer) {
+        Objects.requireNonNull(keySerializer, "keySerializer can't be null");
+        Objects.requireNonNull(storeName, "storeName can't be null");
+        Objects.requireNonNull(key, "key can't be null");
+
+        final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName);
+        if (sourceTopicsInfo == null) {
+            return null;
+        }
+        return getStreamsMetadataForKey(storeName,
+                                        key,
+                                        new DefaultStreamPartitioner<>(keySerializer,
+                                                                       clusterMetadata,
+                                                                       sourceTopicsInfo.topicWithMostPartitions),
+                                        sourceTopicsInfo);
+    }
+
+
+
+
+
+    /**
+     * Find the {@link StreamsMetadata}s for a given storeName and key.
+     *
+     * Note: the key may not exist in the {@link StateStore},
+     * this method provides a way of finding which {@link StreamsMetadata} it would exist on.
+     *
+     * @param storeName   Name of the store
+     * @param key         Key to use
+     * @param partitioner partitioner to use to find correct partition for key
+     * @param <K>         key type
+     * @return The {@link StreamsMetadata} for the storeName and key
+     */
+    public synchronized <K> StreamsMetadata getMetadataWithKey(final String storeName,
+                                                               final K key,
+                                                               final StreamPartitioner<K, ?> partitioner) {
+        Objects.requireNonNull(storeName, "storeName can't be null");
+        Objects.requireNonNull(key, "key can't be null");
+        Objects.requireNonNull(partitioner, "partitioner can't be null");
+
+        SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName);
+        if (sourceTopicsInfo == null) {
+            return null;
+        }
+        return getStreamsMetadataForKey(storeName, key, partitioner, sourceTopicsInfo);
+    }
+
+    /**
+     * Respond to changes to the HostInfo -> TopicPartition mapping. Will rebuild the
+     * metadata
+     * @param currentState  the current mapping of {@link HostInfo} -> {@link TopicPartition}s
+     * @param clusterMetadata    the current clusterMetadata {@link Cluster}
+     */
+    public synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> currentState, final Cluster clusterMetadata) {
+        this.clusterMetadata = clusterMetadata;
+        rebuildMetadata(currentState);
+    }
+
+    private boolean hasPartitionsForAnyTopics(final Set<String> topicNames, final Set<TopicPartition> partitionForHost) {
+        for (TopicPartition topicPartition : partitionForHost) {
+            if (topicNames.contains(topicPartition.topic())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> currentState) {
+        allMetadata.clear();
+        if (currentState.isEmpty()) {
+            return;
+        }
+        final Map<String, Set<String>> stores = builder.stateStoreNameToSourceTopics();
+        for (Map.Entry<HostInfo, Set<TopicPartition>> entry : currentState.entrySet()) {
+            final HostInfo key = entry.getKey();
+            final Set<TopicPartition> partitionsForHost = new HashSet<>(entry.getValue());
+            final Set<String> storesOnHost = new HashSet<>();
+            for (Map.Entry<String, Set<String>> storeTopicEntry : stores.entrySet()) {
+                final Set<String> topicsForStore = storeTopicEntry.getValue();
+                if (hasPartitionsForAnyTopics(topicsForStore, partitionsForHost)) {
+                    storesOnHost.add(storeTopicEntry.getKey());
+                }
+            }
+            allMetadata.add(new StreamsMetadata(key, storesOnHost, partitionsForHost));
+        }
+    }
+
+    private <K> StreamsMetadata getStreamsMetadataForKey(final String storeName,
+                                                         final K key,
+                                                         final StreamPartitioner<K, ?> partitioner,
+                                                         final SourceTopicsInfo sourceTopicsInfo) {
+
+        final Integer partition = partitioner.partition(key, null, sourceTopicsInfo.maxPartitions);
+        final Set<TopicPartition> matchingPartitions = new HashSet<>();
+        for (String sourceTopic : sourceTopicsInfo.sourceTopics) {
+            matchingPartitions.add(new TopicPartition(sourceTopic, partition));
+        }
+
+        for (StreamsMetadata streamsMetadata : allMetadata) {
+            final Set<String> stateStoreNames = streamsMetadata.stateStoreNames();
+            final Set<TopicPartition> topicPartitions = new HashSet<>(streamsMetadata.topicPartitions());
+            topicPartitions.retainAll(matchingPartitions);
+            if (stateStoreNames.contains(storeName)
+                    && !topicPartitions.isEmpty()) {
+                return streamsMetadata;
+            }
+        }
+        return null;
+    }
+
+    private SourceTopicsInfo getSourceTopicsInfo(final String storeName) {
+        final Set<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName);
+        if (sourceTopics == null || sourceTopics.isEmpty()) {
+            return null;
+        }
+        return new SourceTopicsInfo(sourceTopics);
+    }
+
+    private class SourceTopicsInfo {
+        private final Set<String> sourceTopics;
+        private int maxPartitions;
+        private String topicWithMostPartitions;
+
+        private SourceTopicsInfo(final Set<String> sourceTopics) {
+            this.sourceTopics = sourceTopics;
+            for (String topic : sourceTopics) {
+                final List<PartitionInfo> partitions = clusterMetadata.partitionsForTopic(topic);
+                if (partitions != null && partitions.size() > maxPartitions) {
+                    maxPartitions = partitions.size();
+                    topicWithMostPartitions = partitions.get(0).topic();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 0486e57..6569f85 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,6 +21,7 @@ import org.apache.kafka.common.record.ByteBufferInputStream;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.HostInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,19 +40,28 @@ import java.util.Set;
 public class AssignmentInfo {
 
     private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
-
+    /**
+     * A new field was added, partitionsByHostState. CURRENT_VERSION
+     * is required so we can decode the previous version. For example, this may occur
+     * during a rolling upgrade
+     */
+    private static final int CURRENT_VERSION = 2;
     public final int version;
     public final List<TaskId> activeTasks; // each element corresponds to a partition
     public final Map<TaskId, Set<TopicPartition>> standbyTasks;
+    public final Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
 
-    public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
-        this(1, activeTasks, standbyTasks);
+    public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
+                          Map<HostInfo, Set<TopicPartition>> hostState) {
+        this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
     }
 
-    protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
+    protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
+                             Map<HostInfo, Set<TopicPartition>> hostState) {
         this.version = version;
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
+        this.partitionsByHostState = hostState;
     }
 
     /**
@@ -63,43 +73,48 @@ public class AssignmentInfo {
         DataOutputStream out = new DataOutputStream(baos);
 
         try {
-            if (version == 1) {
-                // Encode version
-                out.writeInt(1);
-                // Encode active tasks
-                out.writeInt(activeTasks.size());
-                for (TaskId id : activeTasks) {
-                    id.writeTo(out);
-                }
-                // Encode standby tasks
-                out.writeInt(standbyTasks.size());
-                for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
-                    TaskId id = entry.getKey();
-                    id.writeTo(out);
-
-                    Set<TopicPartition> partitions = entry.getValue();
-                    out.writeInt(partitions.size());
-                    for (TopicPartition partition : partitions) {
-                        out.writeUTF(partition.topic());
-                        out.writeInt(partition.partition());
-                    }
-                }
-
-                out.flush();
-                out.close();
+            // Encode version
+            out.writeInt(version);
+            // Encode active tasks
+            out.writeInt(activeTasks.size());
+            for (TaskId id : activeTasks) {
+                id.writeTo(out);
+            }
+            // Encode standby tasks
+            out.writeInt(standbyTasks.size());
+            for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
+                TaskId id = entry.getKey();
+                id.writeTo(out);
+
+                Set<TopicPartition> partitions = entry.getValue();
+                writeTopicPartitions(out, partitions);
+            }
+            out.writeInt(partitionsByHostState.size());
+            for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHostState
+                    .entrySet()) {
+                final HostInfo hostInfo = entry.getKey();
+                out.writeUTF(hostInfo.host());
+                out.writeInt(hostInfo.port());
+                writeTopicPartitions(out, entry.getValue());
+            }
 
-                return ByteBuffer.wrap(baos.toByteArray());
+            out.flush();
+            out.close();
 
-            } else {
-                TaskAssignmentException ex = new TaskAssignmentException("Unable to encode assignment data: version=" + version);
-                log.error(ex.getMessage(), ex);
-                throw ex;
-            }
+            return ByteBuffer.wrap(baos.toByteArray());
         } catch (IOException ex) {
             throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex);
         }
     }
 
+    private void writeTopicPartitions(DataOutputStream out, Set<TopicPartition> partitions) throws IOException {
+        out.writeInt(partitions.size());
+        for (TopicPartition partition : partitions) {
+            out.writeUTF(partition.topic());
+            out.writeInt(partition.partition());
+        }
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown
      */
@@ -111,42 +126,55 @@ public class AssignmentInfo {
         try {
             // Decode version
             int version = in.readInt();
-            if (version == 1) {
-                // Decode active tasks
-                int count = in.readInt();
-                List<TaskId> activeTasks = new ArrayList<>(count);
-                for (int i = 0; i < count; i++) {
-                    activeTasks.add(TaskId.readFrom(in));
-                }
-                // Decode standby tasks
-                count = in.readInt();
-                Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
-                for (int i = 0; i < count; i++) {
-                    TaskId id = TaskId.readFrom(in);
-
-                    int numPartitions = in.readInt();
-                    Set<TopicPartition> partitions = new HashSet<>(numPartitions);
-                    for (int j = 0; j < numPartitions; j++) {
-                        partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
-                    }
-                    standbyTasks.put(id, partitions);
-                }
-
-                return new AssignmentInfo(activeTasks, standbyTasks);
-
-            } else {
+            if (version < 0 || version > CURRENT_VERSION) {
                 TaskAssignmentException ex = new TaskAssignmentException("Unknown assignment data version: " + version);
                 log.error(ex.getMessage(), ex);
                 throw ex;
             }
+
+            // Decode active tasks
+            int count = in.readInt();
+            List<TaskId> activeTasks = new ArrayList<>(count);
+            for (int i = 0; i < count; i++) {
+                activeTasks.add(TaskId.readFrom(in));
+            }
+            // Decode standby tasks
+            count = in.readInt();
+            Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
+            for (int i = 0; i < count; i++) {
+                TaskId id = TaskId.readFrom(in);
+                standbyTasks.put(id, readTopicPartitions(in));
+            }
+
+            Map<HostInfo, Set<TopicPartition>> hostStateToTopicPartitions = new HashMap<>();
+            if (version == CURRENT_VERSION) {
+                int numEntries = in.readInt();
+                for (int i = 0; i < numEntries; i++) {
+                    HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt());
+                    hostStateToTopicPartitions.put(hostInfo, readTopicPartitions(in));
+                }
+            }
+
+            return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
+
+
         } catch (IOException ex) {
             throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
         }
     }
 
+    private static Set<TopicPartition> readTopicPartitions(DataInputStream in) throws IOException {
+        int numPartitions = in.readInt();
+        Set<TopicPartition> partitions = new HashSet<>(numPartitions);
+        for (int j = 0; j < numPartitions; j++) {
+            partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
+        }
+        return partitions;
+    }
+
     @Override
     public int hashCode() {
-        return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode();
+        return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHostState.hashCode();
     }
 
     @Override
@@ -155,7 +183,8 @@ public class AssignmentInfo {
             AssignmentInfo other = (AssignmentInfo) o;
             return this.version == other.version &&
                     this.activeTasks.equals(other.activeTasks) &&
-                    this.standbyTasks.equals(other.standbyTasks);
+                    this.standbyTasks.equals(other.standbyTasks) &&
+                    this.partitionsByHostState.equals(other.partitionsByHostState);
         } else {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 874fea8..c3481c0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,6 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -31,54 +32,59 @@ public class SubscriptionInfo {
 
     private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
 
-    private static final int CURRENT_VERSION = 1;
+    private static final int CURRENT_VERSION = 2;
 
     public final int version;
     public final UUID processId;
     public final Set<TaskId> prevTasks;
     public final Set<TaskId> standbyTasks;
+    public final String userEndPoint;
 
-    public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
-        this(CURRENT_VERSION, processId, prevTasks, standbyTasks);
+    public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
+        this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
     }
 
-    private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
+    private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
         this.version = version;
         this.processId = processId;
         this.prevTasks = prevTasks;
         this.standbyTasks = standbyTasks;
+        this.userEndPoint = userEndPoint;
     }
 
     /**
      * @throws TaskAssignmentException if method fails to encode the data
      */
     public ByteBuffer encode() {
-        if (version == CURRENT_VERSION) {
-            ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
-            // version
-            buf.putInt(version);
-            // encode client UUID
-            buf.putLong(processId.getMostSignificantBits());
-            buf.putLong(processId.getLeastSignificantBits());
-            // encode ids of previously running tasks
-            buf.putInt(prevTasks.size());
-            for (TaskId id : prevTasks) {
-                id.writeTo(buf);
-            }
-            // encode ids of cached tasks
-            buf.putInt(standbyTasks.size());
-            for (TaskId id : standbyTasks) {
-                id.writeTo(buf);
-            }
-            buf.rewind();
-
-            return buf;
-
+        byte[] endPointBytes;
+        if (userEndPoint == null) {
+            endPointBytes = new byte[0];
         } else {
-            TaskAssignmentException ex = new TaskAssignmentException("unable to encode subscription data: version=" + version);
-            log.error(ex.getMessage(), ex);
-            throw ex;
+            endPointBytes = userEndPoint.getBytes(Charset.forName("UTF-8"));
         }
+        ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 +
+                prevTasks.size() * 8 + 4 + standbyTasks.size() * 8
+                + 4 /* length of bytes */ + endPointBytes.length
+        );
+        // version
+        buf.putInt(version);
+        // encode client UUID
+        buf.putLong(processId.getMostSignificantBits());
+        buf.putLong(processId.getLeastSignificantBits());
+        // encode ids of previously running tasks
+        buf.putInt(prevTasks.size());
+        for (TaskId id : prevTasks) {
+            id.writeTo(buf);
+        }
+        // encode ids of cached tasks
+        buf.putInt(standbyTasks.size());
+        for (TaskId id : standbyTasks) {
+            id.writeTo(buf);
+        }
+        buf.putInt(endPointBytes.length);
+        buf.put(endPointBytes);
+        buf.rewind();
+        return buf;
     }
 
     /**
@@ -90,7 +96,7 @@ public class SubscriptionInfo {
 
         // Decode version
         int version = data.getInt();
-        if (version == CURRENT_VERSION) {
+        if (version == CURRENT_VERSION || version == 1) {
             // Decode client UUID
             UUID processId = new UUID(data.getLong(), data.getLong());
             // Decode previously active tasks
@@ -107,7 +113,17 @@ public class SubscriptionInfo {
                 standbyTasks.add(TaskId.readFrom(data));
             }
 
-            return new SubscriptionInfo(version, processId, prevTasks, standbyTasks);
+            String userEndPoint = null;
+            if (version == CURRENT_VERSION) {
+                int bytesLength = data.getInt();
+                if (bytesLength != 0) {
+                    byte[] bytes = new byte[bytesLength];
+                    data.get(bytes);
+                    userEndPoint = new String(bytes, Charset.forName("UTF-8"));
+                }
+
+            }
+            return new SubscriptionInfo(version, processId, prevTasks, standbyTasks, userEndPoint);
 
         } else {
             TaskAssignmentException ex = new TaskAssignmentException("unable to decode subscription data: version=" + version);
@@ -118,7 +134,11 @@ public class SubscriptionInfo {
 
     @Override
     public int hashCode() {
-        return version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
+        int hashCode = version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
+        if (userEndPoint == null) {
+            return hashCode;
+        }
+        return hashCode ^ userEndPoint.hashCode();
     }
 
     @Override
@@ -128,7 +148,8 @@ public class SubscriptionInfo {
             return this.version == other.version &&
                     this.processId.equals(other.processId) &&
                     this.prevTasks.equals(other.prevTasks) &&
-                    this.standbyTasks.equals(other.standbyTasks);
+                    this.standbyTasks.equals(other.standbyTasks) &&
+                    this.userEndPoint != null ? this.userEndPoint.equals(other.userEndPoint) : other.userEndPoint == null;
         } else {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
new file mode 100644
index 0000000..37a15e1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+/**
+ * Represents a user defined endpoint in a {@link org.apache.kafka.streams.KafkaStreams} application.
+ * Instances of this class can be obtained by calling one of:
+ *  {@link KafkaStreams#allMetadata()}
+ *  {@link KafkaStreams#allMetadataForStore(String)}
+ *  {@link KafkaStreams#metadataForKey(String, Object, StreamPartitioner)}
+ *  {@link KafkaStreams#metadataForKey(String, Object, Serializer)}
+ *
+ *  The HostInfo is constructed during Partition Assignment
+ *  see {@link org.apache.kafka.streams.processor.internals.StreamPartitionAssignor}
+ *  It is extracted from the config {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG}
+ *
+ *  If developers wish to expose an endpoint in their KafkaStreams applications they should provide the above
+ *  config.
+ */
+public class HostInfo {
+    private final String host;
+    private final int port;
+
+    public HostInfo(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        HostInfo hostInfo = (HostInfo) o;
+
+        if (port != hostInfo.port) return false;
+        return host.equals(hostInfo.host);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = host.hashCode();
+        result = 31 * result + port;
+        return result;
+    }
+
+    public String host() {
+        return host;
+    }
+
+    public int port() {
+        return port;
+    }
+
+    @Override
+    public String toString() {
+        return "HostInfo{" +
+                "host='" + host + '\'' +
+                ", port=" + port +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
new file mode 100644
index 0000000..541221f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KafkaStreams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of an instance (process) in a {@link KafkaStreams} application.
+ * It contains the user supplied {@link HostInfo} that can be used by developers to build
+ * APIs and services to connect to other instances, the Set of state stores available on
+ * the instance and the Set of {@link TopicPartition}s available on the instance.
+ * NOTE: This is a point in time view. It may change when rebalances happen.
+ */
+public class StreamsMetadata {
+    private final HostInfo hostInfo;
+    private final Set<String> stateStoreNames;
+    private final Set<TopicPartition> topicPartitions;
+
+    public StreamsMetadata(final HostInfo hostInfo,
+                           final Set<String> stateStoreNames,
+                           final Set<TopicPartition> topicPartitions) {
+
+        this.hostInfo = hostInfo;
+        this.stateStoreNames = stateStoreNames;
+        this.topicPartitions = topicPartitions;
+    }
+
+    public HostInfo hostInfo() {
+        return hostInfo;
+    }
+
+    public Set<String> stateStoreNames() {
+        return stateStoreNames;
+    }
+
+    public Set<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    public String host() {
+        return hostInfo.host();
+    }
+    public int port() {
+        return hostInfo.port();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        final StreamsMetadata that = (StreamsMetadata) o;
+
+        if (!hostInfo.equals(that.hostInfo)) return false;
+        if (!stateStoreNames.equals(that.stateStoreNames)) return false;
+        return topicPartitions.equals(that.topicPartitions);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = hostInfo.hashCode();
+        result = 31 * result + stateStoreNames.hashCode();
+        result = 31 * result + topicPartitions.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "StreamsMetadata{" +
+                "hostInfo=" + hostInfo +
+                ", stateStoreNames=" + stateStoreNames +
+                ", topicPartitions=" + topicPartitions +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 29b8b03..b15c2ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.kafka.streams;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
@@ -34,7 +36,7 @@ import static org.junit.Assert.assertTrue;
 public class KafkaStreamsTest {
 
     // We need this to avoid the KafkaConsumer hanging on poll (this may occur if the test doesn't complete
-    // quick enough
+    // quick enough)
     @ClassRule
     public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
 
@@ -118,6 +120,45 @@ public class KafkaStreamsTest {
         }
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
+        KafkaStreams streams = createKafkaStreams();
+        streams.allMetadata();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
+        KafkaStreams streams = createKafkaStreams();
+        streams.allMetadataForStore("store");
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
+        KafkaStreams streams = createKafkaStreams();
+        streams.metadataForKey("store", "key", Serdes.String().serializer());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
+        KafkaStreams streams = createKafkaStreams();
+        streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
+            @Override
+            public Integer partition(final String key, final Object value, final int numPartitions) {
+                return 0;
+            }
+        });
+    }
+
+
+    private KafkaStreams createKafkaStreams() {
+        Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        KStreamBuilder builder = new KStreamBuilder();
+        return new KafkaStreams(builder, props);
+    }
+
     @Test
     public void testCleanup() throws Exception {
         final Properties props = new Properties();

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 5a30af5..0892893 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
@@ -338,7 +339,7 @@ public class RegexSourceIntegrationTest {
         private int index =  0;
 
         public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) {
-            super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time);
+            super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index c776b8a..b951743 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -26,6 +26,9 @@ import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.After;
 import org.junit.Test;
 
+import java.util.Map;
+import java.util.Set;
+
 import static org.junit.Assert.assertEquals;
 
 public class KStreamBuilderTest {
@@ -89,6 +92,54 @@ public class KStreamBuilderTest {
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
     }
 
+    @Test
+    public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() throws Exception {
+        final String topic1 = "topic-1";
+        final String topic2 = "topic-2";
+        final String topic3 = "topic-3";
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<String, String> source1 = builder.stream(topic1);
+        final KStream<String, String> source2 = builder.stream(topic2);
+        final KStream<String, String> source3 = builder.stream(topic3);
+
+        final KStream<String, String> merged = builder.merge(source1, source2, source3);
+        merged.groupByKey().count("my-table");
+        final Map<String, Set<String>> actual = builder.stateStoreNameToSourceTopics();
+        assertEquals(Utils.mkSet("topic-1", "topic-2", "topic-3"), actual.get("my-table"));
+    }
+
+    @Test
+    public void shouldHaveCorrectSourceTopicsForTableFromMergedStreamWithProcessors() throws Exception {
+        final String topic1 = "topic-1";
+        final String topic2 = "topic-2";
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<String, String> source1 = builder.stream(topic1);
+        final KStream<String, String> source2 = builder.stream(topic2);
+        final KStream<String, String> processedSource1 =
+                source1.mapValues(new ValueMapper<String, String>() {
+                    @Override
+                    public String apply(final String value) {
+                        return value;
+                    }
+                }).filter(new Predicate<String, String>() {
+                    @Override
+                    public boolean test(final String key, final String value) {
+                        return true;
+                    }
+                });
+        final KStream<String, String> processedSource2 = source2.filter(new Predicate<String, String>() {
+            @Override
+            public boolean test(final String key, final String value) {
+                return true;
+            }
+        });
+
+        final KStream<String, String> merged = builder.merge(processedSource1, processedSource2);
+        merged.groupByKey().count("my-table");
+        final Map<String, Set<String>> actual = builder.stateStoreNameToSourceTopics();
+        assertEquals(Utils.mkSet("topic-1", "topic-2"), actual.get("my-table"));
+    }
+
     @Test(expected = TopologyBuilderException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
         new KStreamBuilder().stream();

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index f6ca6db..6f047b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -404,4 +404,39 @@ public class TopologyBuilderTest {
         return nodeNames;
     }
 
+    @Test
+    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source", "topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), true, "processor");
+        final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
+        assertEquals(1, stateStoreNameToSourceTopic.size());
+        assertEquals(Collections.singleton("topic"), stateStoreNameToSourceTopic.get("store"));
+    }
+
+    @Test
+    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source", "topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), false, "processor");
+        final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
+        assertEquals(1, stateStoreNameToSourceTopic.size());
+        assertEquals(Collections.singleton("topic"), stateStoreNameToSourceTopic.get("store"));
+    }
+
+    @Test
+    public void shouldCorrectlyMapStateStoreToInternalTopics() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId("appId");
+        builder.addInternalTopic("internal-topic");
+        builder.addSource("source", "internal-topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), true, "processor");
+        final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
+        assertEquals(1, stateStoreNameToSourceTopic.size());
+        assertEquals(Collections.singleton("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
+    }
+
 }