You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/01/22 22:00:09 UTC

[1/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to KafkaStreams

Repository: kafka
Updated Branches:
  refs/heads/trunk 91ba074e4 -> 21c6cfe50


http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
new file mode 100644
index 0000000..15b114a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -0,0 +1,509 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamPartitionAssignorTest {
+
+    private TopicPartition t1p0 = new TopicPartition("topic1", 0);
+    private TopicPartition t1p1 = new TopicPartition("topic1", 1);
+    private TopicPartition t1p2 = new TopicPartition("topic1", 2);
+    private TopicPartition t2p0 = new TopicPartition("topic2", 0);
+    private TopicPartition t2p1 = new TopicPartition("topic2", 1);
+    private TopicPartition t2p2 = new TopicPartition("topic2", 2);
+    private TopicPartition t3p0 = new TopicPartition("topic3", 0);
+    private TopicPartition t3p1 = new TopicPartition("topic3", 1);
+    private TopicPartition t3p2 = new TopicPartition("topic3", 2);
+    private TopicPartition t3p3 = new TopicPartition("topic3", 3);
+
+    private Set<String> allTopics = Utils.mkSet("topic1", "topic2");
+
+    private List<PartitionInfo> infos = Arrays.asList(
+            new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
+    );
+
+    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+
+    private final TaskId task0 = new TaskId(0, 0);
+    private final TaskId task1 = new TaskId(0, 1);
+    private final TaskId task2 = new TaskId(0, 2);
+    private final TaskId task3 = new TaskId(0, 3);
+
+    private Properties configProps() {
+        return new Properties() {
+            {
+                setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-partition-assignor-test");
+                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+                setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+            }
+        };
+    }
+
+    private ByteArraySerializer serializer = new ByteArraySerializer();
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testSubscription() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+
+        final Set<TaskId> prevTasks = Utils.mkSet(
+                new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1));
+        final Set<TaskId> cachedTasks = Utils.mkSet(
+                new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1),
+                new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));
+
+        String clientId = "client-id";
+        UUID processId = UUID.randomUUID();
+        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) {
+            @Override
+            public Set<TaskId> prevTasks() {
+                return prevTasks;
+            }
+            @Override
+            public Set<TaskId> cachedTasks() {
+                return cachedTasks;
+            }
+        };
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
+
+        PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
+
+        Collections.sort(subscription.topics());
+        assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics());
+
+        Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
+        standbyTasks.removeAll(prevTasks);
+
+        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks);
+        assertEquals(info.encode(), subscription.userData());
+    }
+
+    @Test
+    public void testAssignBasic() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+        List<String> topics = Utils.mkList("topic1", "topic2");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
+        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
+        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
+        final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
+        final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
+        final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
+
+        UUID uuid1 = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+        String client1 = "client1";
+        String client2 = "client2";
+
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+        subscriptions.put("consumer11",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+        subscriptions.put("consumer20",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        // check assigned partitions
+        assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
+                Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
+        assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
+
+        // check assignment info
+
+        Set<TaskId> allActiveTasks = new HashSet<>();
+
+        // the first consumer
+        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+        allActiveTasks.addAll(info10.activeTasks);
+
+        // the second consumer
+        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+        allActiveTasks.addAll(info11.activeTasks);
+
+        assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
+
+        // the third consumer
+        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+        allActiveTasks.addAll(info20.activeTasks);
+
+        assertEquals(3, allActiveTasks.size());
+        assertEquals(allTasks, new HashSet<>(allActiveTasks));
+
+        assertEquals(3, allActiveTasks.size());
+        assertEquals(allTasks, allActiveTasks);
+    }
+
+    @Test
+    public void testAssignWithNewTasks() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addSource("source3", "topic3");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
+        List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);
+
+        // assuming that previous tasks do not have topic3
+        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
+        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
+        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
+
+        UUID uuid1 = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+        String client1 = "client1";
+        String client2 = "client2";
+
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode()));
+        subscriptions.put("consumer11",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode()));
+        subscriptions.put("consumer20",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
+        // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
+        // then later ones will be re-assigned to other hosts due to load balancing
+        Set<TaskId> allActiveTasks = new HashSet<>();
+        Set<TopicPartition> allPartitions = new HashSet<>();
+        AssignmentInfo info;
+
+        info = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        allActiveTasks.addAll(info.activeTasks);
+        allPartitions.addAll(assignments.get("consumer10").partitions());
+
+        info = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        allActiveTasks.addAll(info.activeTasks);
+        allPartitions.addAll(assignments.get("consumer11").partitions());
+
+        info = AssignmentInfo.decode(assignments.get("consumer20").userData());
+        allActiveTasks.addAll(info.activeTasks);
+        allPartitions.addAll(assignments.get("consumer20").partitions());
+
+        assertEquals(allTasks, allActiveTasks);
+        assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions);
+    }
+
+    @Test
+    public void testAssignWithStates() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
+        builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1");
+
+        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
+        builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2");
+        builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2");
+
+        List<String> topics = Utils.mkList("topic1", "topic2");
+
+        TaskId task00 = new TaskId(0, 0);
+        TaskId task01 = new TaskId(0, 1);
+        TaskId task02 = new TaskId(0, 2);
+        TaskId task10 = new TaskId(1, 0);
+        TaskId task11 = new TaskId(1, 1);
+        TaskId task12 = new TaskId(1, 2);
+
+        UUID uuid1 = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+        String client1 = "client1";
+        String client2 = "client2";
+
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+        subscriptions.put("consumer11",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+        subscriptions.put("consumer20",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match
+        assertEquals(2, assignments.get("consumer10").partitions().size());
+        assertEquals(2, assignments.get("consumer11").partitions().size());
+        assertEquals(2, assignments.get("consumer20").partitions().size());
+
+        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size());
+        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size());
+        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size());
+
+        // check tasks for state topics
+        assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1"));
+        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2"));
+        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3"));
+    }
+
+    @Test
+    public void testAssignWithStandbyReplicas() throws Exception {
+        Properties props = configProps();
+        props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
+        StreamsConfig config = new StreamsConfig(props);
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+        List<String> topics = Utils.mkList("topic1", "topic2");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+
+        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
+        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
+        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
+        final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
+        final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
+        final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
+
+        UUID uuid1 = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+        String client1 = "client1";
+        String client2 = "client2";
+
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+        subscriptions.put("consumer11",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+        subscriptions.put("consumer20",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        Set<TaskId> allActiveTasks = new HashSet<>();
+        Set<TaskId> allStandbyTasks = new HashSet<>();
+
+        // the first consumer
+        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+        allActiveTasks.addAll(info10.activeTasks);
+        allStandbyTasks.addAll(info10.standbyTasks.keySet());
+
+        // the second consumer
+        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+        allActiveTasks.addAll(info11.activeTasks);
+        allStandbyTasks.addAll(info11.standbyTasks.keySet());
+
+        // check active tasks assigned to the first client
+        assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
+        assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
+
+        // the third consumer
+        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+        allActiveTasks.addAll(info20.activeTasks);
+        allStandbyTasks.addAll(info20.standbyTasks.keySet());
+
+        // all task ids are in the active tasks and also in the standby tasks
+
+        assertEquals(3, allActiveTasks.size());
+        assertEquals(allTasks, allActiveTasks);
+
+        assertEquals(3, allStandbyTasks.size());
+        assertEquals(allTasks, allStandbyTasks);
+    }
+
+    private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
+
+        // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
+
+        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+        // check if the number of assigned partitions == the size of active task id list
+        assertEquals(assignment.partitions().size(), info.activeTasks.size());
+
+        // check if active tasks are consistent
+        List<TaskId> activeTasks = new ArrayList<>();
+        Set<String> activeTopics = new HashSet<>();
+        for (TopicPartition partition : assignment.partitions()) {
+            // since default grouper, taskid.partition == partition.partition()
+            activeTasks.add(new TaskId(0, partition.partition()));
+            activeTopics.add(partition.topic());
+        }
+        assertEquals(activeTasks, info.activeTasks);
+
+        // check if active partitions cover all topics
+        assertEquals(allTopics, activeTopics);
+
+        // check if standby tasks are consistent
+        Set<String> standbyTopics = new HashSet<>();
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
+            TaskId id = entry.getKey();
+            Set<TopicPartition> partitions = entry.getValue();
+            for (TopicPartition partition : partitions) {
+                // since default grouper, taskid.partition == partition.partition()
+                assertEquals(id.partition, partition.partition());
+
+                standbyTopics.add(partition.topic());
+            }
+        }
+
+        if (info.standbyTasks.size() > 0)
+            // check if standby partitions cover all topics
+            assertEquals(allTopics, standbyTopics);
+
+        return info;
+    }
+
+    @Test
+    public void testOnAssignment() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopicPartition t2p3 = new TopicPartition("topic2", 3);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+
+        UUID uuid = UUID.randomUUID();
+        String client1 = "client1";
+
+        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid, new Metrics(), new SystemTime());
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
+
+        List<TaskId> activeTaskList = Utils.mkList(task0, task3);
+        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+        standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
+        standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
+
+        AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks);
+        PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
+        partitionAssignor.onAssignment(assignment);
+
+        assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0));
+        assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3));
+        assertEquals(standbyTasks, partitionAssignor.standbyTasks());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1847e85..bf3b3b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.test.MockSourceNode;
@@ -69,17 +69,18 @@ public class StreamTaskTest {
             Collections.<StateStoreSupplier>emptyList()
     );
 
-    private StreamingConfig createConfig(final File baseDir) throws Exception {
-        return new StreamingConfig(new Properties() {
+    private StreamsConfig createConfig(final File baseDir) throws Exception {
+        return new StreamsConfig(new Properties() {
             {
-                setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-                setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+                setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-task-test");
+                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+                setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+                setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
             }
         });
     }
@@ -102,7 +103,7 @@ public class StreamTaskTest {
     public void testProcessOrder() throws Exception {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            StreamingConfig config = createConfig(baseDir);
+            StreamsConfig config = createConfig(baseDir);
             StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
@@ -153,7 +154,7 @@ public class StreamTaskTest {
     public void testPauseResume() throws Exception {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            StreamingConfig config = createConfig(baseDir);
+            StreamsConfig config = createConfig(baseDir);
             StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5f0347d..2d531bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -37,7 +37,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -112,13 +112,14 @@ public class StreamThreadTest {
     private Properties configProps() {
         return new Properties() {
             {
-                setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+                setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+                setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-thread-test");
+                setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
             }
         };
     }
@@ -132,7 +133,7 @@ public class StreamThreadTest {
                               Consumer<byte[], byte[]> consumer,
                               Producer<byte[], byte[]> producer,
                               Consumer<byte[], byte[]> restoreConsumer,
-                              StreamingConfig config) {
+                              StreamsConfig config) {
             super(id, "jobId", partitions, topology, consumer, producer, restoreConsumer, config, null);
         }
 
@@ -148,7 +149,7 @@ public class StreamThreadTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testPartitionAssignmentChange() throws Exception {
-        StreamingConfig config = new StreamingConfig(configProps());
+        StreamsConfig config = new StreamsConfig(configProps());
 
         MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
         MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
@@ -258,10 +259,10 @@ public class StreamThreadTest {
         try {
             final long cleanupDelay = 1000L;
             Properties props = configProps();
-            props.setProperty(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay));
-            props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+            props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay));
+            props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
 
-            StreamingConfig config = new StreamingConfig(props);
+            StreamsConfig config = new StreamsConfig(props);
 
             File stateDir1 = new File(baseDir, task1.toString());
             File stateDir2 = new File(baseDir, task2.toString());
@@ -389,10 +390,10 @@ public class StreamThreadTest {
         try {
             final long commitInterval = 1000L;
             Properties props = configProps();
-            props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
-            props.setProperty(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
+            props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+            props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
 
-            StreamingConfig config = new StreamingConfig(props);
+            StreamsConfig config = new StreamsConfig(props);
 
             MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
             MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
@@ -468,9 +469,9 @@ public class StreamThreadTest {
         }
     }
 
-    private void initPartitionGrouper(StreamingConfig config, StreamThread thread) {
+    private void initPartitionGrouper(StreamsConfig config, StreamThread thread) {
 
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
 
         partitionAssignor.configure(config.getConsumerConfigs(thread, thread.jobId, thread.clientId));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index b0c9bd7..36e487b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -23,12 +23,13 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.test.MockProcessorContext;
@@ -221,10 +222,10 @@ public class KeyValueStoreTestDriver<K, V> {
     private final Serdes<K, V> serdes;
     private final Map<K, V> flushedEntries = new HashMap<>();
     private final Set<K> flushedRemovals = new HashSet<>();
-    private final List<Entry<K, V>> restorableEntries = new LinkedList<>();
+    private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>();
     private final MockProcessorContext context;
     private final Map<String, StateStore> storeMap = new HashMap<>();
-    private final StreamingMetrics metrics = new StreamingMetrics() {
+    private final StreamsMetrics metrics = new StreamsMetrics() {
         @Override
         public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
             return null;
@@ -248,7 +249,7 @@ public class KeyValueStoreTestDriver<K, V> {
             }
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer,
-                                    StreamPartitioner<K1, V1> partitioner) {
+                                    StreamsPartitioner<K1, V1> partitioner) {
                 recordFlushed(record.key(), record.value());
             }
         };
@@ -256,12 +257,12 @@ public class KeyValueStoreTestDriver<K, V> {
         this.stateDir.mkdirs();
 
         Properties props = new Properties();
-        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
-        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass());
-        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
-        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
-        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass());
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
 
         this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
                 serdes.valueDeserializer(), recordCollector) {
@@ -287,7 +288,7 @@ public class KeyValueStoreTestDriver<K, V> {
             }
 
             @Override
-            public StreamingMetrics metrics() {
+            public StreamsMetrics metrics() {
                 return metrics;
             }
 
@@ -313,10 +314,10 @@ public class KeyValueStoreTestDriver<K, V> {
     }
 
     private void restoreEntries(StateRestoreCallback func) {
-        for (Entry<K, V> entry : restorableEntries) {
+        for (KeyValue<K, V> entry : restorableEntries) {
             if (entry != null) {
-                byte[] rawKey = serdes.rawKey(entry.key());
-                byte[] rawValue = serdes.rawValue(entry.value());
+                byte[] rawKey = serdes.rawKey(entry.key);
+                byte[] rawValue = serdes.rawValue(entry.value);
                 func.restore(rawKey, rawValue);
             }
         }
@@ -352,7 +353,7 @@ public class KeyValueStoreTestDriver<K, V> {
      * @see #checkForRestoredEntries(KeyValueStore)
      */
     public void addEntryToRestoreLog(K key, V value) {
-        restorableEntries.add(new Entry<K, V>(key, value));
+        restorableEntries.add(new KeyValue<K, V>(key, value));
     }
 
     /**
@@ -376,7 +377,7 @@ public class KeyValueStoreTestDriver<K, V> {
      *
      * @return the restore entries; never null but possibly a null iterator
      */
-    public Iterable<Entry<K, V>> restoredEntries() {
+    public Iterable<KeyValue<K, V>> restoredEntries() {
         return restorableEntries;
     }
 
@@ -390,10 +391,10 @@ public class KeyValueStoreTestDriver<K, V> {
      */
     public int checkForRestoredEntries(KeyValueStore<K, V> store) {
         int missing = 0;
-        for (Entry<K, V> entry : restorableEntries) {
-            if (entry != null) {
-                V value = store.get(entry.key());
-                if (!Objects.equals(value, entry.value())) {
+        for (KeyValue<K, V> kv : restorableEntries) {
+            if (kv != null) {
+                V value = store.get(kv.key);
+                if (!Objects.equals(value, kv.value)) {
                     ++missing;
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 2ed698c..8effd77 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -21,8 +21,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
@@ -73,11 +73,11 @@ public abstract class AbstractKeyValueStoreTest {
             // Check range iteration ...
             try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
                 while (iter.hasNext()) {
-                    Entry<Integer, String> entry = iter.next();
-                    if (entry.key().equals(2))
-                        assertEquals("two", entry.value());
-                    else if (entry.key().equals(4))
-                        assertEquals("four", entry.value());
+                    KeyValue<Integer, String> entry = iter.next();
+                    if (entry.key.equals(2))
+                        assertEquals("two", entry.value);
+                    else if (entry.key.equals(4))
+                        assertEquals("four", entry.value);
                     else
                         fail("Unexpected entry: " + entry);
                 }
@@ -86,11 +86,11 @@ public abstract class AbstractKeyValueStoreTest {
             // Check range iteration ...
             try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
                 while (iter.hasNext()) {
-                    Entry<Integer, String> entry = iter.next();
-                    if (entry.key().equals(2))
-                        assertEquals("two", entry.value());
-                    else if (entry.key().equals(4))
-                        assertEquals("four", entry.value());
+                    KeyValue<Integer, String> entry = iter.next();
+                    if (entry.key.equals(2))
+                        assertEquals("two", entry.value);
+                    else if (entry.key.equals(4))
+                        assertEquals("four", entry.value);
                     else
                         fail("Unexpected entry: " + entry);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 80ad67f..45448e5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -24,10 +24,10 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.Serdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -69,12 +69,12 @@ public class RocksDBWindowStoreTest {
     public void testPutAndFetch() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -165,12 +165,12 @@ public class RocksDBWindowStoreTest {
     public void testPutAndFetchBefore() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -261,12 +261,12 @@ public class RocksDBWindowStoreTest {
     public void testPutAndFetchAfter() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -357,12 +357,12 @@ public class RocksDBWindowStoreTest {
     public void testPutSameKeyTimestamp() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -416,12 +416,12 @@ public class RocksDBWindowStoreTest {
     public void testRolling() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -528,7 +528,7 @@ public class RocksDBWindowStoreTest {
 
     @Test
     public void testRestore() throws IOException {
-        final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+        final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
         long startTime = segmentSize * 2;
         long incr = segmentSize / 2;
 
@@ -538,7 +538,7 @@ public class RocksDBWindowStoreTest {
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -587,7 +587,7 @@ public class RocksDBWindowStoreTest {
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
+                    changeLog.add(new KeyValue<>(
                                     keySerializer.serialize(record.topic(), record.key()),
                                     valueSerializer.serialize(record.topic(), record.value()))
                     );
@@ -655,13 +655,13 @@ public class RocksDBWindowStoreTest {
         return set;
     }
 
-    private Map<Integer, Set<String>> entriesByKey(List<Entry<byte[], byte[]>> changeLog, long startTime) {
+    private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> changeLog, long startTime) {
         HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
 
-        for (Entry<byte[], byte[]> entry : changeLog) {
-            long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key());
-            Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key(), serdes);
-            String value = entry.value() == null ? null : serdes.valueFrom(entry.value());
+        for (KeyValue<byte[], byte[]> entry : changeLog) {
+            long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key);
+            Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key, serdes);
+            String value = entry.value == null ? null : serdes.valueFrom(entry.value);
 
             Set<String> entries = entriesByKey.get(key);
             if (entries == null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 2dc567e..8f8e00f 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -130,7 +130,7 @@ public class KStreamTestDriver {
         
         @Override
         public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
-                                StreamPartitioner<K, V> partitioner) {
+                                StreamsPartitioner<K, V> partitioner) {
             // The serialization is skipped.
             process(record.topic(), record.key(), record.value());
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index a6a29cd..cb7a95c 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -18,7 +18,8 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -26,7 +27,6 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.Entry;
 
 import java.io.File;
 import java.util.Collections;
@@ -123,8 +123,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public StreamingMetrics metrics() {
-        return new StreamingMetrics() {
+    public StreamsMetrics metrics() {
+        return new StreamsMetrics() {
             @Override
             public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
                 return null;
@@ -192,10 +192,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         return Collections.unmodifiableMap(storeMap);
     }
 
-    public void restore(String storeName, List<Entry<byte[], byte[]>> changeLog) {
+    public void restore(String storeName, List<KeyValue<byte[], byte[]>> changeLog) {
         StateRestoreCallback restoreCallback = restoreFuncs.get(storeName);
-        for (Entry<byte[], byte[]> entry : changeLog) {
-            restoreCallback.restore(entry.key(), entry.value());
+        for (KeyValue<byte[], byte[]> entry : changeLog) {
+            restoreCallback.restore(entry.key, entry.value);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
index 98aa0d4..828b5ae 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.test;
 
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 
 public class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index eaeed09..af6d51b 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -27,8 +27,8 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * can use and test code you already have that uses a builder to create topologies. Best of all, the class works without a real
  * Kafka broker, so the tests execute very quickly with very little overhead.
  * <p>
- * Using the ProcessorTopologyTestDriver in tests is easy: simply instantiate the driver with a {@link StreamingConfig} and a
+ * Using the ProcessorTopologyTestDriver in tests is easy: simply instantiate the driver with a {@link StreamsConfig} and a
  * TopologyBuilder, use the driver to supply an input message to the topology, and then use the driver to read and verify any
  * messages output by the topology.
  * <p>
@@ -65,7 +65,7 @@ import java.util.concurrent.atomic.AtomicLong;
  *
  * <h2>Driver setup</h2>
  * <p>
- * In order to create a ProcessorTopologyTestDriver instance, you need a TopologyBuilder and a {@link StreamingConfig}. The
+ * In order to create a ProcessorTopologyTestDriver instance, you need a TopologyBuilder and a {@link StreamsConfig}. The
  * configuration needs to be representative of what you'd supply to the real topology, so that means including several key
  * properties. For example, the following code fragment creates a configuration that specifies a local Kafka broker list
  * (which is needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:
@@ -74,13 +74,13 @@ import java.util.concurrent.atomic.AtomicLong;
  * StringSerializer strSerializer = new StringSerializer();
  * StringDeserializer strDeserializer = new StringDeserializer();
  * Properties props = new Properties();
- * props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- * props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
- * props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
- * props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
- * props.setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
- * props.setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
- * StreamingConfig config = new StreamingConfig(props);
+ * props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ * props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
+ * props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
+ * props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
+ * props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
+ * props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
+ * StreamsConfig config = new StreamsConfig(props);
  * TopologyBuilder builder = ...
  * ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
  * </pre>
@@ -139,11 +139,11 @@ public class ProcessorTopologyTestDriver {
 
     /**
      * Create a new test driver instance.
-     * @param config the streaming configuration for the topology
+     * @param config the stream configuration for the topology
      * @param builder the topology builder that will be used to create the topology instance
      * @param storeNames the optional names of the state stores that are used by the topology
      */
-    public ProcessorTopologyTestDriver(StreamingConfig config, TopologyBuilder builder, String... storeNames) {
+    public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) {
         id = new TaskId(0, 0);
         topology = builder.build(null);
 
@@ -173,7 +173,7 @@ public class ProcessorTopologyTestDriver {
             producer,
             restoreStateConsumer,
             config,
-            new StreamingMetrics() {
+            new StreamsMetrics() {
                 @Override
                 public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
                     return null;
@@ -265,7 +265,7 @@ public class ProcessorTopologyTestDriver {
 
     /**
      * Get the {@link StateStore} with the given name. The name should have been supplied via
-     * {@link #ProcessorTopologyTestDriver(StreamingConfig, TopologyBuilder, String...) this object's constructor}, and is
+     * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is
      * presumed to be used by a Processor within the topology.
      * <p>
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
@@ -281,7 +281,7 @@ public class ProcessorTopologyTestDriver {
 
     /**
      * Get the {@link KeyValueStore} with the given name. The name should have been supplied via
-     * {@link #ProcessorTopologyTestDriver(StreamingConfig, TopologyBuilder, String...) this object's constructor}, and is
+     * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is
      * presumed to be used by a Processor within the topology.
      * <p>
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to


[3/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to KafkaStreams

Posted by gw...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
deleted file mode 100644
index f14d9d9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-/**
- * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
- * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
- * <p>
- * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
- * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
- * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
- * <p>
- * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
- * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
- * automatically manage these instances, and adjust when new topology instances are added or removed.
- * <p>
- * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
- * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
- * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
- * determine to which partition each message should be written.
- * <p>
- * To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner
- * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
- * for that topic.
- * <p>
- * All StreamPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
- * 
- * @param <K> the type of keys
- * @param <V> the type of values
- * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
- *      org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...)
- * @see TopologyBuilder#addSink(String, String, StreamPartitioner, String...)
- */
-public interface StreamPartitioner<K, V> {
-
-    /**
-     * Determine the partition number for a message with the given key and value and the current number of partitions.
-     * 
-     * @param key the key of the message
-     * @param value the value of the message
-     * @param numPartitions the total number of partitions
-     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
-     */
-    Integer partition(K key, V value, int numPartitions);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
new file mode 100644
index 0000000..f8d199d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+/**
+ * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
+ * <p>
+ * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
+ * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
+ * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
+ * <p>
+ * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
+ * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
+ * automatically manage these instances, and adjust when new topology instances are added or removed.
+ * <p>
+ * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
+ * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
+ * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
+ * determine to which partition each message should be written.
+ * <p>
+ * To do this, create a <code>StreamsPartitioner</code> implementation, and when you build your topology specify that custom partitioner
+ * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...) adding a sink}
+ * for that topic.
+ * <p>
+ * All StreamsPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
+ * 
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
+ *      org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...)
+ * @see TopologyBuilder#addSink(String, String, StreamsPartitioner, String...)
+ */
+public interface StreamsPartitioner<K, V> {
+
+    /**
+     * Determine the partition number for a message with the given key and value and the current number of partitions.
+     * 
+     * @param key the key of the message
+     * @param value the value of the message
+     * @param numPartitions the total number of partitions
+     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
+     */
+    Integer partition(K key, V value, int numPartitions);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index d6b63d2..f4e6821 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -46,8 +46,8 @@ import java.util.Set;
  * its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
  * processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
  * is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
- * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreaming}
- * instance that will then {@link org.apache.kafka.streams.KafkaStreaming#start() begin consuming, processing, and producing messages}.
+ * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams}
+ * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing messages}.
  */
 public class TopologyBuilder {
 
@@ -135,9 +135,9 @@ public class TopologyBuilder {
         public final String topic;
         private Serializer keySerializer;
         private Serializer valSerializer;
-        private final StreamPartitioner partitioner;
+        private final StreamsPartitioner partitioner;
 
-        private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner) {
+        private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamsPartitioner partitioner) {
             super(name);
             this.parents = parents.clone();
             this.topic = topic;
@@ -188,9 +188,9 @@ public class TopologyBuilder {
 
     /**
      * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
+     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      *
      * @param name the unique name of the source used to reference this node when
      * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
@@ -208,11 +208,11 @@ public class TopologyBuilder {
      * @param name the unique name of the source used to reference this node when
      * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
      * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param topics the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
@@ -236,18 +236,18 @@ public class TopologyBuilder {
 
     /**
      * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
-     * The sink will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
-     * {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
-     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
+     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      *
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its messages
      * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
-     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, StreamsPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
      */
     public final TopologyBuilder addSink(String name, String topic, String... parentNames) {
         return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
@@ -256,11 +256,11 @@ public class TopologyBuilder {
     /**
      * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic, using
      * the supplied partitioner.
-     * The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
-     * {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
-     * {@link StreamingConfig streaming configuration}.
+     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      * <p>
-     * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
+     * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
      * the named Kafka topic's partitions. Such control is often useful with topologies that use
      * {@link #addStateStore(StateStoreSupplier, String...) state stores}
      * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@@ -274,9 +274,9 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
      */
-    public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) {
+    public final TopologyBuilder addSink(String name, String topic, StreamsPartitioner partitioner, String... parentNames) {
         return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames);
     }
 
@@ -284,7 +284,7 @@ public class TopologyBuilder {
      * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
      * The sink will use the specified key and value serializers.
      * <p>
-     * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
+     * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
      * the named Kafka topic's partitions. Such control is often useful with topologies that use
      * {@link #addStateStore(StateStoreSupplier, String...) state stores}
      * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@@ -293,20 +293,20 @@ public class TopologyBuilder {
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its messages
      * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
-     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
-     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     * @see #addSink(String, String, StreamsPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
      */
     public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
-        return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null, parentNames);
+        return addSink(name, topic, keySerializer, valSerializer, (StreamsPartitioner) null, parentNames);
     }
 
     /**
@@ -316,20 +316,20 @@ public class TopologyBuilder {
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its messages
      * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
-     * should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
-     * {@link StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
-     * should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
-     * {@link StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param partitioner the function that should be used to determine the partition for each message processed by the sink
      * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, StreamsPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      */
-    public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
+    public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner, String... parentNames) {
         if (nodeFactories.containsKey(name))
             throw new TopologyException("Processor " + name + " is already added.");
 
@@ -589,9 +589,9 @@ public class TopologyBuilder {
 
     /**
      * Build the topology for the specified topic group. This is called automatically when passing this builder into the
-     * {@link org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)} constructor.
+     * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)} constructor.
      *
-     * @see org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)
+     * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
      */
     public ProcessorTopology build(Integer topicGroupId) {
         Set<String> nodeGroup;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 569f4e6..ef4c3c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
@@ -48,7 +48,7 @@ public abstract class AbstractTask {
                            ProcessorTopology topology,
                            Consumer<byte[], byte[]> consumer,
                            Consumer<byte[], byte[]> restoreConsumer,
-                           StreamingConfig config,
+                           StreamsConfig config,
                            boolean isStandby) {
         this.id = id;
         this.partitions = new HashSet<>(partitions);
@@ -57,7 +57,7 @@ public abstract class AbstractTask {
 
         // create the processor state manager
         try {
-            File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
+            File stateFile = new File(config.getString(StreamsConfig.STATE_DIR_CONFIG), id.toString());
             // if partitions is null, this is a standby task
             this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
deleted file mode 100644
index 2734f56..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ /dev/null
@@ -1,483 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
-import org.apache.kafka.streams.processor.internals.assignment.ClientState;
-import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignmentException;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.zookeeper.ZooDefs;
-
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.I0Itec.zkclient.ZkClient;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Configurable {
-
-    private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
-
-    private StreamThread streamThread;
-
-    private int numStandbyReplicas;
-    private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
-    private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
-    private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds;
-    private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
-    private Map<TaskId, Set<TopicPartition>> standbyTasks;
-
-    // TODO: the following ZK dependency should be removed after KIP-4
-    private static final String ZK_TOPIC_PATH = "/brokers/topics";
-    private static final String ZK_BROKER_PATH = "/brokers/ids";
-    private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
-
-    private ZkClient zkClient;
-
-    private class ZKStringSerializer implements ZkSerializer {
-
-        @Override
-        public byte[] serialize(Object data) {
-            try {
-                return ((String) data).getBytes("UTF-8");
-            } catch (UnsupportedEncodingException e) {
-                throw new AssertionError(e);
-            }
-        }
-
-        @Override
-        public Object deserialize(byte[] bytes) {
-            try {
-                if (bytes == null)
-                    return null;
-                else
-                    return new String(bytes, "UTF-8");
-            } catch (UnsupportedEncodingException e) {
-                throw new AssertionError(e);
-            }
-        }
-    }
-
-    private List<Integer> getBrokers() {
-        List<Integer> brokers = new ArrayList<>();
-        for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
-            brokers.add(Integer.parseInt(broker));
-        }
-        Collections.sort(brokers);
-
-        log.debug("Read brokers {} from ZK in partition assignor.", brokers);
-
-        return brokers;
-    }
-
-    @SuppressWarnings("unchecked")
-    private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
-        String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
-
-        if (data == null) return null;
-
-        try {
-            ObjectMapper mapper = new ObjectMapper();
-
-            Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
-
-            });
-
-            Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
-
-            log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
-
-            return partitions;
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException {
-        log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
-
-        // we always assign leaders to brokers starting at the first one with replication factor 1
-        List<Integer> brokers = getBrokers();
-
-        Map<Integer, List<Integer>> assignment = new HashMap<>();
-        for (int i = 0; i < numPartitions; i++) {
-            assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size())));
-        }
-
-        // try to write to ZK with open ACL
-        try {
-            Map<String, Object> dataMap = new HashMap<>();
-            dataMap.put("version", 1);
-            dataMap.put("partitions", assignment);
-
-            ObjectMapper mapper = new ObjectMapper();
-            String data = mapper.writeValueAsString(dataMap);
-
-            zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
-        } catch (JsonProcessingException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    private void deleteTopic(String topic) throws ZkNodeExistsException {
-        log.debug("Deleting topic {} from ZK in partition assignor.", topic);
-
-        zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
-    }
-
-    private void addPartitions(String topic, int numPartitions, Map<Integer, List<Integer>> existingAssignment) {
-        log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
-
-        // we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1
-        List<Integer> brokers = getBrokers();
-
-        int startIndex = existingAssignment.size();
-
-        Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
-
-        for (int i = 0; i < numPartitions; i++) {
-            newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size()));
-        }
-
-        // try to write to ZK with open ACL
-        try {
-            Map<String, Object> dataMap = new HashMap<>();
-            dataMap.put("version", 1);
-            dataMap.put("partitions", newAssignment);
-
-            ObjectMapper mapper = new ObjectMapper();
-            String data = mapper.writeValueAsString(dataMap);
-
-            zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
-        } catch (JsonProcessingException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    /**
-     * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
-     * since the former needs later's cached metadata while sending subscriptions,
-     * and the latter needs former's returned assignment when adding tasks.
-     */
-    @Override
-    public void configure(Map<String, ?> configs) {
-        numStandbyReplicas = (Integer) configs.get(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
-
-        Object o = configs.get(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE);
-        if (o == null) {
-            KafkaException ex = new KafkaException("StreamThread is not specified");
-            log.error(ex.getMessage(), ex);
-            throw ex;
-        }
-
-        if (!(o instanceof StreamThread)) {
-            KafkaException ex = new KafkaException(o.getClass().getName() + " is not an instance of " + StreamThread.class.getName());
-            log.error(ex.getMessage(), ex);
-            throw ex;
-        }
-
-        streamThread = (StreamThread) o;
-        streamThread.partitionAssignor(this);
-
-        this.topicGroups = streamThread.builder.topicGroups();
-
-        if (configs.containsKey(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG))
-            zkClient = new ZkClient((String) configs.get(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer());
-    }
-
-    @Override
-    public String name() {
-        return "streaming";
-    }
-
-    @Override
-    public Subscription subscription(Set<String> topics) {
-        // Adds the following information to subscription
-        // 1. Client UUID (a unique id assigned to an instance of KafkaStreaming)
-        // 2. Task ids of previously running tasks
-        // 3. Task ids of valid local states on the client's state directory.
-
-        Set<TaskId> prevTasks = streamThread.prevTasks();
-        Set<TaskId> standbyTasks = streamThread.cachedTasks();
-        standbyTasks.removeAll(prevTasks);
-        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
-
-        return new Subscription(new ArrayList<>(topics), data.encode());
-    }
-
-    @Override
-    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
-        // This assigns tasks to consumer clients in two steps.
-        // 1. using TaskAssignor tasks are assigned to streaming clients.
-        //    - Assign a task to a client which was running it previously.
-        //      If there is no such client, assign a task to a client which has its valid local state.
-        //    - A client may have more than one stream threads.
-        //      The assignor tries to assign tasks to a client proportionally to the number of threads.
-        //    - We try not to assign the same set of tasks to two different clients
-        //    We do the assignment in one-pass. The result may not satisfy above all.
-        // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
-        Map<UUID, Set<String>> consumersByClient = new HashMap<>();
-        Map<UUID, ClientState<TaskId>> states = new HashMap<>();
-
-        // Decode subscription info
-        for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
-            String consumerId = entry.getKey();
-            Subscription subscription = entry.getValue();
-
-            SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
-
-            Set<String> consumers = consumersByClient.get(info.processId);
-            if (consumers == null) {
-                consumers = new HashSet<>();
-                consumersByClient.put(info.processId, consumers);
-            }
-            consumers.add(consumerId);
-
-            ClientState<TaskId> state = states.get(info.processId);
-            if (state == null) {
-                state = new ClientState<>();
-                states.put(info.processId, state);
-            }
-
-            state.prevActiveTasks.addAll(info.prevTasks);
-            state.prevAssignedTasks.addAll(info.prevTasks);
-            state.prevAssignedTasks.addAll(info.standbyTasks);
-            state.capacity = state.capacity + 1d;
-        }
-
-        // get the tasks as partition groups from the partition grouper
-        Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
-        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
-            sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
-        }
-        Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata);
-
-        // add tasks to state topic subscribers
-        stateChangelogTopicToTaskIds = new HashMap<>();
-        internalSourceTopicToTaskIds = new HashMap<>();
-        for (TaskId task : partitionsForTask.keySet()) {
-            for (String stateName : topicGroups.get(task.topicGroupId).stateChangelogTopics) {
-                Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(stateName);
-                if (tasks == null) {
-                    tasks = new HashSet<>();
-                    stateChangelogTopicToTaskIds.put(stateName, tasks);
-                }
-
-                tasks.add(task);
-            }
-
-            for (String topicName : topicGroups.get(task.topicGroupId).interSourceTopics) {
-                Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topicName);
-                if (tasks == null) {
-                    tasks = new HashSet<>();
-                    internalSourceTopicToTaskIds.put(topicName, tasks);
-                }
-
-                tasks.add(task);
-            }
-        }
-
-        // assign tasks to clients
-        states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
-        Map<String, Assignment> assignment = new HashMap<>();
-
-        for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
-            UUID processId = entry.getKey();
-            Set<String> consumers = entry.getValue();
-            ClientState<TaskId> state = states.get(processId);
-
-            ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
-            final int numActiveTasks = state.activeTasks.size();
-            for (TaskId taskId : state.activeTasks) {
-                taskIds.add(taskId);
-            }
-            for (TaskId id : state.assignedTasks) {
-                if (!state.activeTasks.contains(id))
-                    taskIds.add(id);
-            }
-
-            final int numConsumers = consumers.size();
-            List<TaskId> active = new ArrayList<>();
-            Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
-
-            int i = 0;
-            for (String consumer : consumers) {
-                List<TopicPartition> activePartitions = new ArrayList<>();
-
-                final int numTaskIds = taskIds.size();
-                for (int j = i; j < numTaskIds; j += numConsumers) {
-                    TaskId taskId = taskIds.get(j);
-                    if (j < numActiveTasks) {
-                        for (TopicPartition partition : partitionsForTask.get(taskId)) {
-                            activePartitions.add(partition);
-                            active.add(taskId);
-                        }
-                    } else {
-                        Set<TopicPartition> standbyPartitions = standby.get(taskId);
-                        if (standbyPartitions == null) {
-                            standbyPartitions = new HashSet<>();
-                            standby.put(taskId, standbyPartitions);
-                        }
-                        standbyPartitions.addAll(partitionsForTask.get(taskId));
-                    }
-                }
-
-                AssignmentInfo data = new AssignmentInfo(active, standby);
-                assignment.put(consumer, new Assignment(activePartitions, data.encode()));
-                i++;
-
-                active.clear();
-                standby.clear();
-            }
-        }
-
-        // if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions
-        if (zkClient != null) {
-            log.debug("Starting to validate changelog topics in partition assignor.");
-
-            Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
-            topicToTaskIds.putAll(stateChangelogTopicToTaskIds);
-            topicToTaskIds.putAll(internalSourceTopicToTaskIds);
-
-            for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
-                String topic = streamThread.jobId + "-" + entry.getKey();
-
-                // the expected number of partitions is the max value of TaskId.partition + 1
-                int numPartitions = 0;
-                for (TaskId task : entry.getValue()) {
-                    if (numPartitions < task.partition + 1)
-                        numPartitions = task.partition + 1;
-                }
-
-                boolean topicNotReady = true;
-
-                while (topicNotReady) {
-                    Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
-
-                    // if topic does not exist, create it
-                    if (topicMetadata == null) {
-                        try {
-                            createTopic(topic, numPartitions);
-                        } catch (ZkNodeExistsException e) {
-                            // ignore and continue
-                        }
-                    } else {
-                        if (topicMetadata.size() > numPartitions) {
-                            // else if topic exists with more #.partitions than needed, delete in order to re-create it
-                            try {
-                                deleteTopic(topic);
-                            } catch (ZkNodeExistsException e) {
-                                // ignore and continue
-                            }
-                        } else if (topicMetadata.size() < numPartitions) {
-                            // else if topic exists with less #.partitions than needed, add partitions
-                            try {
-                                addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata);
-                            } catch (ZkNoNodeException e) {
-                                // ignore and continue
-                            }
-                        }
-
-                        topicNotReady = false;
-                    }
-                }
-
-                // wait until the topic metadata has been propagated to all brokers
-                List<PartitionInfo> partitions;
-                do {
-                    partitions = streamThread.restoreConsumer.partitionsFor(topic);
-                } while (partitions == null || partitions.size() != numPartitions);
-            }
-
-            log.info("Completed validating changelog topics in partition assignor.");
-        }
-
-        return assignment;
-    }
-
-    @Override
-    public void onAssignment(Assignment assignment) {
-        List<TopicPartition> partitions = assignment.partitions();
-
-        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
-        this.standbyTasks = info.standbyTasks;
-
-        Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
-        Iterator<TaskId> iter = info.activeTasks.iterator();
-        for (TopicPartition partition : partitions) {
-            Set<TaskId> taskIds = partitionToTaskIds.get(partition);
-            if (taskIds == null) {
-                taskIds = new HashSet<>();
-                partitionToTaskIds.put(partition, taskIds);
-            }
-
-            if (iter.hasNext()) {
-                taskIds.add(iter.next());
-            } else {
-                TaskAssignmentException ex = new TaskAssignmentException(
-                        "failed to find a task id for the partition=" + partition.toString() +
-                        ", partitions=" + partitions.size() + ", assignmentInfo=" + info.toString()
-                );
-                log.error(ex.getMessage(), ex);
-                throw ex;
-            }
-        }
-        this.partitionToTaskIds = partitionToTaskIds;
-    }
-
-    /* For Test Only */
-    public Set<TaskId> tasksForState(String stateName) {
-        return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
-    }
-
-    public Set<TaskId> tasksForPartition(TopicPartition partition) {
-        return partitionToTaskIds.get(partition);
-    }
-
-    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
-        return standbyTasks;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 3429df3..102c534 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -20,8 +20,8 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -38,7 +38,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
 
     private final TaskId id;
     private final StreamTask task;
-    private final StreamingMetrics metrics;
+    private final StreamsMetrics metrics;
     private final RecordCollector collector;
     private final ProcessorStateManager stateMgr;
 
@@ -52,10 +52,10 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     @SuppressWarnings("unchecked")
     public ProcessorContextImpl(TaskId id,
                                 StreamTask task,
-                                StreamingConfig config,
+                                StreamsConfig config,
                                 RecordCollector collector,
                                 ProcessorStateManager stateMgr,
-                                StreamingMetrics metrics) {
+                                StreamsMetrics metrics) {
         this.id = id;
         this.task = task;
         this.metrics = metrics;
@@ -113,7 +113,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public StreamingMetrics metrics() {
+    public StreamsMetrics metrics() {
         return metrics;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index fe0472e..25c663d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +72,7 @@ public class RecordCollector {
     }
 
     public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
-                            StreamPartitioner<K, V> partitioner) {
+                            StreamsPartitioner<K, V> partitioner) {
         byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
         byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
         Integer partition = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 7ab59ee..88b3f56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -20,18 +20,18 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
 
 public class SinkNode<K, V> extends ProcessorNode<K, V> {
 
     private final String topic;
     private Serializer<K> keySerializer;
     private Serializer<V> valSerializer;
-    private final StreamPartitioner<K, V> partitioner;
+    private final StreamsPartitioner<K, V> partitioner;
 
     private ProcessorContext context;
 
-    public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner) {
+    public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner) {
         super(name);
 
         this.topic = topic;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index e0583e3..89d185c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -20,8 +20,8 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -36,7 +36,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class);
 
     private final TaskId id;
-    private final StreamingMetrics metrics;
+    private final StreamsMetrics metrics;
     private final ProcessorStateManager stateMgr;
 
     private final Serializer<?> keySerializer;
@@ -47,9 +47,9 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     private boolean initialized;
 
     public StandbyContextImpl(TaskId id,
-                              StreamingConfig config,
+                              StreamsConfig config,
                               ProcessorStateManager stateMgr,
-                              StreamingMetrics metrics) {
+                              StreamsMetrics metrics) {
         this.id = id;
         this.metrics = metrics;
         this.stateMgr = stateMgr;
@@ -105,7 +105,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     }
 
     @Override
-    public StreamingMetrics metrics() {
+    public StreamsMetrics metrics() {
         return metrics;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 4cc4ea4..861b830 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -21,8 +21,8 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,8 +50,8 @@ public class StandbyTask extends AbstractTask {
      * @param topology              the instance of {@link ProcessorTopology}
      * @param consumer              the instance of {@link Consumer}
      * @param restoreConsumer       the instance of {@link Consumer} used when restoring state
-     * @param config                the {@link StreamingConfig} specified by the user
-     * @param metrics               the {@link StreamingMetrics} created by the thread
+     * @param config                the {@link StreamsConfig} specified by the user
+     * @param metrics               the {@link StreamsMetrics} created by the thread
      */
     public StandbyTask(TaskId id,
                        String jobId,
@@ -59,8 +59,8 @@ public class StandbyTask extends AbstractTask {
                        ProcessorTopology topology,
                        Consumer<byte[], byte[]> consumer,
                        Consumer<byte[], byte[]> restoreConsumer,
-                       StreamingConfig config,
-                       StreamingMetrics metrics) {
+                       StreamsConfig config,
+                       StreamsMetrics metrics) {
         super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true);
 
         // initialize the topology with its own context

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
new file mode 100644
index 0000000..5d87e5a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.ClientState;
+import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignmentException;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.zookeeper.ZooDefs;
+
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.I0Itec.zkclient.ZkClient;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
+
+    private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
+
+    private StreamThread streamThread;
+
+    private int numStandbyReplicas;
+    private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
+    private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
+    private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds;
+    private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
+    private Map<TaskId, Set<TopicPartition>> standbyTasks;
+
+    // TODO: the following ZK dependency should be removed after KIP-4
+    private static final String ZK_TOPIC_PATH = "/brokers/topics";
+    private static final String ZK_BROKER_PATH = "/brokers/ids";
+    private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
+
+    private ZkClient zkClient;
+
+    private class ZKStringSerializer implements ZkSerializer {
+
+        @Override
+        public byte[] serialize(Object data) {
+            try {
+                return ((String) data).getBytes("UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                throw new AssertionError(e);
+            }
+        }
+
+        @Override
+        public Object deserialize(byte[] bytes) {
+            try {
+                if (bytes == null)
+                    return null;
+                else
+                    return new String(bytes, "UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                throw new AssertionError(e);
+            }
+        }
+    }
+
+    private List<Integer> getBrokers() {
+        List<Integer> brokers = new ArrayList<>();
+        for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
+            brokers.add(Integer.parseInt(broker));
+        }
+        Collections.sort(brokers);
+
+        log.debug("Read brokers {} from ZK in partition assignor.", brokers);
+
+        return brokers;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
+        String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
+
+        if (data == null) return null;
+
+        try {
+            ObjectMapper mapper = new ObjectMapper();
+
+            Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
+
+            });
+
+            Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
+
+            log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
+
+            return partitions;
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException {
+        log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
+
+        // we always assign leaders to brokers starting at the first one with replication factor 1
+        List<Integer> brokers = getBrokers();
+
+        Map<Integer, List<Integer>> assignment = new HashMap<>();
+        for (int i = 0; i < numPartitions; i++) {
+            assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size())));
+        }
+
+        // try to write to ZK with open ACL
+        try {
+            Map<String, Object> dataMap = new HashMap<>();
+            dataMap.put("version", 1);
+            dataMap.put("partitions", assignment);
+
+            ObjectMapper mapper = new ObjectMapper();
+            String data = mapper.writeValueAsString(dataMap);
+
+            zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+        } catch (JsonProcessingException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    private void deleteTopic(String topic) throws ZkNodeExistsException {
+        log.debug("Deleting topic {} from ZK in partition assignor.", topic);
+
+        zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
+    }
+
+    private void addPartitions(String topic, int numPartitions, Map<Integer, List<Integer>> existingAssignment) {
+        log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
+
+        // we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1
+        List<Integer> brokers = getBrokers();
+
+        int startIndex = existingAssignment.size();
+
+        Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
+
+        for (int i = 0; i < numPartitions; i++) {
+            newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size()));
+        }
+
+        // try to write to ZK with open ACL
+        try {
+            Map<String, Object> dataMap = new HashMap<>();
+            dataMap.put("version", 1);
+            dataMap.put("partitions", newAssignment);
+
+            ObjectMapper mapper = new ObjectMapper();
+            String data = mapper.writeValueAsString(dataMap);
+
+            zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
+        } catch (JsonProcessingException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
+     * since the former needs later's cached metadata while sending subscriptions,
+     * and the latter needs former's returned assignment when adding tasks.
+     */
+    @Override
+    public void configure(Map<String, ?> configs) {
+        numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+
+        Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
+        if (o == null) {
+            KafkaException ex = new KafkaException("StreamThread is not specified");
+            log.error(ex.getMessage(), ex);
+            throw ex;
+        }
+
+        if (!(o instanceof StreamThread)) {
+            KafkaException ex = new KafkaException(o.getClass().getName() + " is not an instance of " + StreamThread.class.getName());
+            log.error(ex.getMessage(), ex);
+            throw ex;
+        }
+
+        streamThread = (StreamThread) o;
+        streamThread.partitionAssignor(this);
+
+        this.topicGroups = streamThread.builder.topicGroups();
+
+        if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG))
+            zkClient = new ZkClient((String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer());
+    }
+
+    @Override
+    public String name() {
+        return "stream";
+    }
+
+    @Override
+    public Subscription subscription(Set<String> topics) {
+        // Adds the following information to subscription
+        // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
+        // 2. Task ids of previously running tasks
+        // 3. Task ids of valid local states on the client's state directory.
+
+        Set<TaskId> prevTasks = streamThread.prevTasks();
+        Set<TaskId> standbyTasks = streamThread.cachedTasks();
+        standbyTasks.removeAll(prevTasks);
+        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
+
+        return new Subscription(new ArrayList<>(topics), data.encode());
+    }
+
+    @Override
+    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
+        // This assigns tasks to consumer clients in two steps.
+        // 1. using TaskAssignor to assign tasks to consumer clients.
+        //    - Assign a task to a client which was running it previously.
+        //      If there is no such client, assign a task to a client which has its valid local state.
+        //    - A client may have more than one stream threads.
+        //      The assignor tries to assign tasks to a client proportionally to the number of threads.
+        //    - We try not to assign the same set of tasks to two different clients
+        //    We do the assignment in one-pass. The result may not satisfy above all.
+        // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
+        Map<UUID, Set<String>> consumersByClient = new HashMap<>();
+        Map<UUID, ClientState<TaskId>> states = new HashMap<>();
+
+        // Decode subscription info
+        for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
+            String consumerId = entry.getKey();
+            Subscription subscription = entry.getValue();
+
+            SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+
+            Set<String> consumers = consumersByClient.get(info.processId);
+            if (consumers == null) {
+                consumers = new HashSet<>();
+                consumersByClient.put(info.processId, consumers);
+            }
+            consumers.add(consumerId);
+
+            ClientState<TaskId> state = states.get(info.processId);
+            if (state == null) {
+                state = new ClientState<>();
+                states.put(info.processId, state);
+            }
+
+            state.prevActiveTasks.addAll(info.prevTasks);
+            state.prevAssignedTasks.addAll(info.prevTasks);
+            state.prevAssignedTasks.addAll(info.standbyTasks);
+            state.capacity = state.capacity + 1d;
+        }
+
+        // get the tasks as partition groups from the partition grouper
+        Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
+        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+            sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
+        }
+        Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata);
+
+        // add tasks to state topic subscribers
+        stateChangelogTopicToTaskIds = new HashMap<>();
+        internalSourceTopicToTaskIds = new HashMap<>();
+        for (TaskId task : partitionsForTask.keySet()) {
+            for (String stateName : topicGroups.get(task.topicGroupId).stateChangelogTopics) {
+                Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(stateName);
+                if (tasks == null) {
+                    tasks = new HashSet<>();
+                    stateChangelogTopicToTaskIds.put(stateName, tasks);
+                }
+
+                tasks.add(task);
+            }
+
+            for (String topicName : topicGroups.get(task.topicGroupId).interSourceTopics) {
+                Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topicName);
+                if (tasks == null) {
+                    tasks = new HashSet<>();
+                    internalSourceTopicToTaskIds.put(topicName, tasks);
+                }
+
+                tasks.add(task);
+            }
+        }
+
+        // assign tasks to clients
+        states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
+        Map<String, Assignment> assignment = new HashMap<>();
+
+        for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
+            UUID processId = entry.getKey();
+            Set<String> consumers = entry.getValue();
+            ClientState<TaskId> state = states.get(processId);
+
+            ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
+            final int numActiveTasks = state.activeTasks.size();
+            for (TaskId taskId : state.activeTasks) {
+                taskIds.add(taskId);
+            }
+            for (TaskId id : state.assignedTasks) {
+                if (!state.activeTasks.contains(id))
+                    taskIds.add(id);
+            }
+
+            final int numConsumers = consumers.size();
+            List<TaskId> active = new ArrayList<>();
+            Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
+
+            int i = 0;
+            for (String consumer : consumers) {
+                List<TopicPartition> activePartitions = new ArrayList<>();
+
+                final int numTaskIds = taskIds.size();
+                for (int j = i; j < numTaskIds; j += numConsumers) {
+                    TaskId taskId = taskIds.get(j);
+                    if (j < numActiveTasks) {
+                        for (TopicPartition partition : partitionsForTask.get(taskId)) {
+                            activePartitions.add(partition);
+                            active.add(taskId);
+                        }
+                    } else {
+                        Set<TopicPartition> standbyPartitions = standby.get(taskId);
+                        if (standbyPartitions == null) {
+                            standbyPartitions = new HashSet<>();
+                            standby.put(taskId, standbyPartitions);
+                        }
+                        standbyPartitions.addAll(partitionsForTask.get(taskId));
+                    }
+                }
+
+                AssignmentInfo data = new AssignmentInfo(active, standby);
+                assignment.put(consumer, new Assignment(activePartitions, data.encode()));
+                i++;
+
+                active.clear();
+                standby.clear();
+            }
+        }
+
+        // if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions
+        if (zkClient != null) {
+            log.debug("Starting to validate changelog topics in partition assignor.");
+
+            Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
+            topicToTaskIds.putAll(stateChangelogTopicToTaskIds);
+            topicToTaskIds.putAll(internalSourceTopicToTaskIds);
+
+            for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
+                String topic = streamThread.jobId + "-" + entry.getKey();
+
+                // the expected number of partitions is the max value of TaskId.partition + 1
+                int numPartitions = 0;
+                for (TaskId task : entry.getValue()) {
+                    if (numPartitions < task.partition + 1)
+                        numPartitions = task.partition + 1;
+                }
+
+                boolean topicNotReady = true;
+
+                while (topicNotReady) {
+                    Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
+
+                    // if topic does not exist, create it
+                    if (topicMetadata == null) {
+                        try {
+                            createTopic(topic, numPartitions);
+                        } catch (ZkNodeExistsException e) {
+                            // ignore and continue
+                        }
+                    } else {
+                        if (topicMetadata.size() > numPartitions) {
+                            // else if topic exists with more #.partitions than needed, delete in order to re-create it
+                            try {
+                                deleteTopic(topic);
+                            } catch (ZkNodeExistsException e) {
+                                // ignore and continue
+                            }
+                        } else if (topicMetadata.size() < numPartitions) {
+                            // else if topic exists with less #.partitions than needed, add partitions
+                            try {
+                                addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata);
+                            } catch (ZkNoNodeException e) {
+                                // ignore and continue
+                            }
+                        }
+
+                        topicNotReady = false;
+                    }
+                }
+
+                // wait until the topic metadata has been propagated to all brokers
+                List<PartitionInfo> partitions;
+                do {
+                    partitions = streamThread.restoreConsumer.partitionsFor(topic);
+                } while (partitions == null || partitions.size() != numPartitions);
+            }
+
+            log.info("Completed validating changelog topics in partition assignor.");
+        }
+
+        return assignment;
+    }
+
+    @Override
+    public void onAssignment(Assignment assignment) {
+        List<TopicPartition> partitions = assignment.partitions();
+
+        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+        this.standbyTasks = info.standbyTasks;
+
+        Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
+        Iterator<TaskId> iter = info.activeTasks.iterator();
+        for (TopicPartition partition : partitions) {
+            Set<TaskId> taskIds = partitionToTaskIds.get(partition);
+            if (taskIds == null) {
+                taskIds = new HashSet<>();
+                partitionToTaskIds.put(partition, taskIds);
+            }
+
+            if (iter.hasNext()) {
+                taskIds.add(iter.next());
+            } else {
+                TaskAssignmentException ex = new TaskAssignmentException(
+                        "failed to find a task id for the partition=" + partition.toString() +
+                        ", partitions=" + partitions.size() + ", assignmentInfo=" + info.toString()
+                );
+                log.error(ex.getMessage(), ex);
+                throw ex;
+            }
+        }
+        this.partitionToTaskIds = partitionToTaskIds;
+    }
+
+    /* For Test Only */
+    public Set<TaskId> tasksForState(String stateName) {
+        return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
+    }
+
+    public Set<TaskId> tasksForPartition(TopicPartition partition) {
+        return partitionToTaskIds.get(partition);
+    }
+
+    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
+        return standbyTasks;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 2e58ad5..ec3d011 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -22,8 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.slf4j.Logger;
@@ -67,8 +67,8 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * @param consumer              the instance of {@link Consumer}
      * @param producer              the instance of {@link Producer}
      * @param restoreConsumer       the instance of {@link Consumer} used when restoring state
-     * @param config                the {@link StreamingConfig} specified by the user
-     * @param metrics               the {@link StreamingMetrics} created by the thread
+     * @param config                the {@link StreamsConfig} specified by the user
+     * @param metrics               the {@link StreamsMetrics} created by the thread
      */
     public StreamTask(TaskId id,
                       String jobId,
@@ -77,11 +77,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
                       Consumer<byte[], byte[]> consumer,
                       Producer<byte[], byte[]> producer,
                       Consumer<byte[], byte[]> restoreConsumer,
-                      StreamingConfig config,
-                      StreamingMetrics metrics) {
+                      StreamsConfig config,
+                      StreamsMetrics metrics) {
         super(id, jobId, partitions, topology, consumer, restoreConsumer, config, false);
         this.punctuationQueue = new PunctuationQueue();
-        this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+        this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 
         // create queues for each assigned partition and associate them
         // to corresponding source nodes in the processor topology
@@ -93,7 +93,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
             partitionQueues.put(partition, queue);
         }
 
-        TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
+        TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
         this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
 
         // initialize the consumed offset cache


[2/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to KafkaStreams

Posted by gw...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 578357a..e5d0922 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -39,8 +39,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -67,14 +67,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class StreamThread extends Thread {
 
     private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
-    private static final AtomicInteger STREAMING_THREAD_ID_SEQUENCE = new AtomicInteger(1);
+    private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
 
     public final PartitionGrouper partitionGrouper;
     public final String jobId;
     public final String clientId;
     public final UUID processId;
 
-    protected final StreamingConfig config;
+    protected final StreamsConfig config;
     protected final TopologyBuilder builder;
     protected final Set<String> sourceTopics;
     protected final Producer<byte[], byte[]> producer;
@@ -93,9 +93,9 @@ public class StreamThread extends Thread {
     private final long cleanTimeMs;
     private final long commitTimeMs;
     private final long totalRecordsToProcess;
-    private final StreamingMetricsImpl sensors;
+    private final StreamsMetricsImpl sensors;
 
-    private KafkaStreamingPartitionAssignor partitionAssignor = null;
+    private StreamPartitionAssignor partitionAssignor = null;
 
     private long lastClean;
     private long lastCommit;
@@ -122,17 +122,17 @@ public class StreamThread extends Thread {
     };
 
     public StreamThread(TopologyBuilder builder,
-                        StreamingConfig config,
+                        StreamsConfig config,
                         String jobId,
                         String clientId,
                         UUID processId,
                         Metrics metrics,
-                        Time time) throws Exception {
+                        Time time) {
         this(builder, config, null , null, null, jobId, clientId, processId, metrics, time);
     }
 
     StreamThread(TopologyBuilder builder,
-                 StreamingConfig config,
+                 StreamsConfig config,
                  Producer<byte[], byte[]> producer,
                  Consumer<byte[], byte[]> consumer,
                  Consumer<byte[], byte[]> restoreConsumer,
@@ -140,8 +140,8 @@ public class StreamThread extends Thread {
                  String clientId,
                  UUID processId,
                  Metrics metrics,
-                 Time time) throws Exception {
-        super("StreamThread-" + STREAMING_THREAD_ID_SEQUENCE.getAndIncrement());
+                 Time time) {
+        super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
 
         this.jobId = jobId;
         this.config = config;
@@ -149,7 +149,7 @@ public class StreamThread extends Thread {
         this.sourceTopics = builder.sourceTopics();
         this.clientId = clientId;
         this.processId = processId;
-        this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
+        this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
 
         // set the producer and consumer clients
         this.producer = (producer != null) ? producer : createProducer();
@@ -167,24 +167,24 @@ public class StreamThread extends Thread {
         this.standbyRecords = new HashMap<>();
 
         // read in task specific config values
-        this.stateDir = new File(this.config.getString(StreamingConfig.STATE_DIR_CONFIG));
+        this.stateDir = new File(this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
         this.stateDir.mkdir();
-        this.pollTimeMs = config.getLong(StreamingConfig.POLL_MS_CONFIG);
-        this.commitTimeMs = config.getLong(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG);
-        this.cleanTimeMs = config.getLong(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
-        this.totalRecordsToProcess = config.getLong(StreamingConfig.TOTAL_RECORDS_TO_PROCESS);
+        this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
+        this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
+        this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
+        this.totalRecordsToProcess = config.getLong(StreamsConfig.TOTAL_RECORDS_TO_PROCESS);
 
         this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
         this.lastCommit = time.milliseconds();
         this.recordsProcessed = 0;
         this.time = time;
 
-        this.sensors = new StreamingMetricsImpl(metrics);
+        this.sensors = new StreamsMetricsImpl(metrics);
 
         this.running = new AtomicBoolean(true);
     }
 
-    public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) {
+    public void partitionAssignor(StreamPartitionAssignor partitionAssignor) {
         this.partitionAssignor = partitionAssignor;
     }
 
@@ -227,7 +227,7 @@ public class StreamThread extends Thread {
     }
 
     /**
-     * Shutdown this streaming thread.
+     * Shutdown this stream thread.
      */
     public void close() {
         running.set(false);
@@ -673,7 +673,7 @@ public class StreamThread extends Thread {
         }
     }
 
-    private class StreamingMetricsImpl implements StreamingMetrics {
+    private class StreamsMetricsImpl implements StreamsMetrics {
         final Metrics metrics;
         final String metricGrpName;
         final Map<String, String> metricTags;
@@ -685,10 +685,10 @@ public class StreamThread extends Thread {
         final Sensor taskCreationSensor;
         final Sensor taskDestructionSensor;
 
-        public StreamingMetricsImpl(Metrics metrics) {
+        public StreamsMetricsImpl(Metrics metrics) {
 
             this.metrics = metrics;
-            this.metricGrpName = "streaming-metrics";
+            this.metricGrpName = "stream-metrics";
             this.metricTags = new LinkedHashMap<>();
             this.metricTags.put("client-id", clientId + "-" + getName());
 
@@ -734,7 +734,7 @@ public class StreamThread extends Thread {
             for (int i = 0; i < tags.length; i += 2)
                 tagMap.put(tags[i], tags[i + 1]);
 
-            String metricGroupName = "streaming-" + scopeName + "-metrics";
+            String metricGroupName = "stream-" + scopeName + "-metrics";
 
             // first add the global operation metrics if not yet, with the global tags only
             Sensor parent = metrics.sensor(scopeName + "-" + operationName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Entry.java b/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
deleted file mode 100644
index 183b691..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-public class Entry<K, V> {
-
-    private final K key;
-    private final V value;
-
-    public Entry(K key, V value) {
-        this.key = key;
-        this.value = value;
-    }
-
-    public K key() {
-        return key;
-    }
-
-    public V value() {
-        return value;
-    }
-
-    public String toString() {
-        return "Entry(" + key() + ", " + value() + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
index 0fbd4ae..bd118a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
@@ -19,10 +19,12 @@
 
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.streams.KeyValue;
+
 import java.io.Closeable;
 import java.util.Iterator;
 
-public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>>, Closeable {
+public interface KeyValueIterator<K, V> extends Iterator<KeyValue<K, V>>, Closeable {
 
     @Override
     public void close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index e4faed1..d448044 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -19,6 +19,7 @@
 
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.StateStore;
 
 import java.util.List;
@@ -55,7 +56,7 @@ public interface KeyValueStore<K, V> extends StateStore {
      * @param entries A list of entries to put into the store.
      * @throws NullPointerException If null is used for any key or value.
      */
-    abstract public void putAll(List<Entry<K, V>> entries);
+    abstract public void putAll(List<KeyValue<K, V>> entries);
 
     /**
      * Delete the value from the store (if there is one)

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
index 55d1ac3..08cd049 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
@@ -19,7 +19,7 @@
 
 package org.apache.kafka.streams.state;
 
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 
 import java.util.Iterator;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 286db1b..4856b09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -18,10 +18,10 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Serdes;
@@ -97,9 +97,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
         }
 
         @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
+        public void putAll(List<KeyValue<K, V>> entries) {
+            for (KeyValue<K, V> entry : entries)
+                put(entry.key, entry.value);
         }
 
         @Override
@@ -140,9 +140,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
             }
 
             @Override
-            public Entry<K, V> next() {
+            public KeyValue<K, V> next() {
                 Map.Entry<K, V> entry = iter.next();
-                return new Entry<>(entry.getKey(), entry.getValue());
+                return new KeyValue<>(entry.getKey(), entry.getValue());
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index 6a38423..22ee3f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -17,10 +17,10 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Serdes;
@@ -131,9 +131,9 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
         }
 
         @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
+        public void putAll(List<KeyValue<K, V>> entries) {
+            for (KeyValue<K, V> entry : entries)
+                put(entry.key, entry.value);
         }
 
         @Override
@@ -179,9 +179,9 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
             }
 
             @Override
-            public Entry<K, V> next() {
+            public KeyValue<K, V> next() {
                 lastKey = keys.next();
-                return new Entry<>(lastKey, entries.get(lastKey));
+                return new KeyValue<>(lastKey, entries.get(lastKey));
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 21f73b0..d5fe44a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -18,13 +18,13 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Serdes;
@@ -47,7 +47,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private Sensor rangeTime;
     private Sensor flushTime;
     private Sensor restoreTime;
-    private StreamingMetrics metrics;
+    private StreamsMetrics metrics;
 
     private boolean loggingEnabled = true;
     private StoreChangeLogger<K, V> changeLogger = null;
@@ -141,14 +141,14 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public void putAll(List<Entry<K, V>> entries) {
+    public void putAll(List<KeyValue<K, V>> entries) {
         long startNs = time.nanoseconds();
         try {
             this.inner.putAll(entries);
 
             if (loggingEnabled) {
-                for (Entry<K, V> entry : entries) {
-                    K key = entry.key();
+                for (KeyValue<K, V> entry : entries) {
+                    K key = entry.key;
                     changeLogger.add(key);
                 }
                 changeLogger.maybeLogChange(this.getter);
@@ -231,7 +231,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         }
 
         @Override
-        public Entry<K1, V1> next() {
+        public KeyValue<K1, V1> next() {
             return iter.next();
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 821927d..862c322 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -20,8 +20,8 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.state.Serdes;
@@ -40,7 +40,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
     private Sensor rangeTime;
     private Sensor flushTime;
     private Sensor restoreTime;
-    private StreamingMetrics metrics;
+    private StreamsMetrics metrics;
 
     private boolean loggingEnabled = true;
     private StoreChangeLogger<byte[], byte[]> changeLogger = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 8a600f9..6c77ab2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -18,8 +18,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Serdes;
@@ -147,9 +147,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public void putAll(List<Entry<K, V>> entries) {
-        for (Entry<K, V> entry : entries)
-            put(entry.key(), entry.value());
+    public void putAll(List<KeyValue<K, V>> entries) {
+        for (KeyValue<K, V> entry : entries)
+            put(entry.key, entry.value);
     }
 
     @Override
@@ -200,8 +200,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             return iter.key();
         }
 
-        protected Entry<K, V> getEntry() {
-            return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+        protected KeyValue<K, V> getKeyValue() {
+            return new KeyValue<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
         }
 
         @Override
@@ -210,11 +210,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         }
 
         @Override
-        public Entry<K, V> next() {
+        public KeyValue<K, V> next() {
             if (!hasNext())
                 throw new NoSuchElementException();
 
-            Entry<K, V> entry = this.getEntry();
+            KeyValue<K, V> entry = this.getKeyValue();
             iter.next();
             return entry;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 933ed91..d854c92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -20,9 +20,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.Serdes;
 import org.apache.kafka.streams.state.WindowStore;
@@ -86,10 +85,10 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
             if (index >= iterators.length)
                 throw new NoSuchElementException();
 
-            Entry<byte[], byte[]> entry = iterators[index].next();
+            KeyValue<byte[], byte[]> kv = iterators[index].next();
 
-            return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()),
-                                  serdes.valueFrom(entry.value()));
+            return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(kv.key),
+                                  serdes.valueFrom(kv.value));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
deleted file mode 100644
index 3b3fc9b..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.examples.WallclockTimestampExtractor;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Map;
-import java.util.Properties;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-
-
-public class StreamingConfigTest {
-
-    private Properties props = new Properties();
-    private StreamingConfig streamingConfig;
-    private StreamThread streamThreadPlaceHolder;
-
-
-    @Before
-    public void setUp() {
-        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
-        streamingConfig = new StreamingConfig(props);
-    }
-
-    @Test
-    public void testGetProducerConfigs() throws Exception {
-        Map<String, Object> returnedProps = streamingConfig.getProducerConfigs("client");
-        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-producer");
-    }
-
-    @Test
-    public void testGetConsumerConfigs() throws Exception {
-        Map<String, Object> returnedProps = streamingConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client");
-        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
-        assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job");
-
-    }
-
-    @Test
-    public void testGetRestoreConsumerConfigs() throws Exception {
-        Map<String, Object> returnedProps = streamingConfig.getRestoreConsumerConfigs("client");
-        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
-        assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
new file mode 100644
index 0000000..777fae5
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.examples.WallclockTimestampExtractor;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Properties;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+
+public class StreamsConfigTest {
+
+    private Properties props = new Properties();
+    private StreamsConfig streamsConfig;
+    private StreamThread streamThreadPlaceHolder;
+
+
+    @Before
+    public void setUp() {
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-config-test");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+        streamsConfig = new StreamsConfig(props);
+    }
+
+    @Test
+    public void testGetProducerConfigs() throws Exception {
+        Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client");
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-producer");
+    }
+
+    @Test
+    public void testGetConsumerConfigs() throws Exception {
+        Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client");
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
+        assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job");
+
+    }
+
+    @Test
+    public void testGetRestoreConsumerConfigs() throws Exception {
+        Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client");
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
+        assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index a55fd30..693f58e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 880adce..f226cee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 0f7cb6a..73c517b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 0b7b1e7..426259f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index aa09e74..12bfb9c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 1527f17..e3cf22b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 67b83f5..feabc08 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
deleted file mode 100644
index 1b8cbb8..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-
-public class WindowedStreamPartitionerTest {
-
-    private String topicName = "topic";
-
-    private IntegerSerializer keySerializer = new IntegerSerializer();
-    private StringSerializer valSerializer = new StringSerializer();
-
-    private List<PartitionInfo> infos = Arrays.asList(
-            new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
-    );
-
-    private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
-
-    @Test
-    public void testCopartitioning() {
-
-        Random rand = new Random();
-
-        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
-
-        WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
-        WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer);
-
-        for (int k = 0; k < 10; k++) {
-            Integer key = rand.nextInt();
-            byte[] keyBytes = keySerializer.serialize(topicName, key);
-
-            String value = key.toString();
-            byte[] valueBytes = valSerializer.serialize(topicName, value);
-
-            Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
-
-            for (int w = 0; w < 10; w++) {
-                HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
-
-                Windowed<Integer> windowedKey = new Windowed<>(key, window);
-                Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
-
-                assertEquals(expected, actual);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
new file mode 100644
index 0000000..18494fd
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class WindowedStreamsPartitionerTest {
+
+    private String topicName = "topic";
+
+    private IntegerSerializer keySerializer = new IntegerSerializer();
+    private StringSerializer valSerializer = new StringSerializer();
+
+    private List<PartitionInfo> infos = Arrays.asList(
+            new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
+    );
+
+    private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+
+    @Test
+    public void testCopartitioning() {
+
+        Random rand = new Random();
+
+        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
+
+        WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
+        WindowedStreamsPartitioner<Integer, String> streamPartitioner = new WindowedStreamsPartitioner<>(windowedSerializer);
+
+        for (int k = 0; k < 10; k++) {
+            Integer key = rand.nextInt();
+            byte[] keyBytes = keySerializer.serialize(topicName, key);
+
+            String value = key.toString();
+            byte[] valueBytes = valSerializer.serialize(topicName, value);
+
+            Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
+
+            for (int w = 0; w < 10; w++) {
+                HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
+
+                Windowed<Integer> windowedKey = new Windowed<>(key, window);
+                Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
+
+                assertEquals(expected, actual);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
deleted file mode 100644
index 92d7b6a..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
-import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-
-import static org.junit.Assert.assertEquals;
-
-public class KafkaStreamingPartitionAssignorTest {
-
-    private TopicPartition t1p0 = new TopicPartition("topic1", 0);
-    private TopicPartition t1p1 = new TopicPartition("topic1", 1);
-    private TopicPartition t1p2 = new TopicPartition("topic1", 2);
-    private TopicPartition t2p0 = new TopicPartition("topic2", 0);
-    private TopicPartition t2p1 = new TopicPartition("topic2", 1);
-    private TopicPartition t2p2 = new TopicPartition("topic2", 2);
-    private TopicPartition t3p0 = new TopicPartition("topic3", 0);
-    private TopicPartition t3p1 = new TopicPartition("topic3", 1);
-    private TopicPartition t3p2 = new TopicPartition("topic3", 2);
-    private TopicPartition t3p3 = new TopicPartition("topic3", 3);
-
-    private Set<String> allTopics = Utils.mkSet("topic1", "topic2");
-
-    private List<PartitionInfo> infos = Arrays.asList(
-            new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
-    );
-
-    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
-
-    private final TaskId task0 = new TaskId(0, 0);
-    private final TaskId task1 = new TaskId(0, 1);
-    private final TaskId task2 = new TaskId(0, 2);
-    private final TaskId task3 = new TaskId(0, 3);
-
-    private Properties configProps() {
-        return new Properties() {
-            {
-                setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-            }
-        };
-    }
-
-    private ByteArraySerializer serializer = new ByteArraySerializer();
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testSubscription() throws Exception {
-        StreamingConfig config = new StreamingConfig(configProps());
-
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-
-        final Set<TaskId> prevTasks = Utils.mkSet(
-                new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1));
-        final Set<TaskId> cachedTasks = Utils.mkSet(
-                new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1),
-                new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));
-
-        String clientId = "client-id";
-        UUID processId = UUID.randomUUID();
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) {
-            @Override
-            public Set<TaskId> prevTasks() {
-                return prevTasks;
-            }
-            @Override
-            public Set<TaskId> cachedTasks() {
-                return cachedTasks;
-            }
-        };
-
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
-
-        PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
-
-        Collections.sort(subscription.topics());
-        assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics());
-
-        Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
-        standbyTasks.removeAll(prevTasks);
-
-        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks);
-        assertEquals(info.encode(), subscription.userData());
-    }
-
-    @Test
-    public void testAssignBasic() throws Exception {
-        StreamingConfig config = new StreamingConfig(configProps());
-
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-        List<String> topics = Utils.mkList("topic1", "topic2");
-        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
-
-        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
-        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
-        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
-        final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
-        final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
-        final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
-
-        UUID uuid1 = UUID.randomUUID();
-        UUID uuid2 = UUID.randomUUID();
-        String client1 = "client1";
-        String client2 = "client2";
-
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
-
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
-        subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
-        subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
-
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
-
-        // check assigned partitions
-        assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
-                Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
-        assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
-
-        // check assignment info
-
-        Set<TaskId> allActiveTasks = new HashSet<>();
-
-        // the first consumer
-        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
-
-        // the second consumer
-        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
-        allActiveTasks.addAll(info11.activeTasks);
-
-        assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
-
-        // the third consumer
-        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
-        allActiveTasks.addAll(info20.activeTasks);
-
-        assertEquals(3, allActiveTasks.size());
-        assertEquals(allTasks, new HashSet<>(allActiveTasks));
-
-        assertEquals(3, allActiveTasks.size());
-        assertEquals(allTasks, allActiveTasks);
-    }
-
-    @Test
-    public void testAssignWithNewTasks() throws Exception {
-        StreamingConfig config = new StreamingConfig(configProps());
-
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addSource("source3", "topic3");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
-        List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
-        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);
-
-        // assuming that previous tasks do not have topic3
-        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
-        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
-        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
-
-        UUID uuid1 = UUID.randomUUID();
-        UUID uuid2 = UUID.randomUUID();
-        String client1 = "client1";
-        String client2 = "client2";
-
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
-
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode()));
-        subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode()));
-        subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode()));
-
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
-
-        // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
-        // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
-        // then later ones will be re-assigned to other hosts due to load balancing
-        Set<TaskId> allActiveTasks = new HashSet<>();
-        Set<TopicPartition> allPartitions = new HashSet<>();
-        AssignmentInfo info;
-
-        info = AssignmentInfo.decode(assignments.get("consumer10").userData());
-        allActiveTasks.addAll(info.activeTasks);
-        allPartitions.addAll(assignments.get("consumer10").partitions());
-
-        info = AssignmentInfo.decode(assignments.get("consumer11").userData());
-        allActiveTasks.addAll(info.activeTasks);
-        allPartitions.addAll(assignments.get("consumer11").partitions());
-
-        info = AssignmentInfo.decode(assignments.get("consumer20").userData());
-        allActiveTasks.addAll(info.activeTasks);
-        allPartitions.addAll(assignments.get("consumer20").partitions());
-
-        assertEquals(allTasks, allActiveTasks);
-        assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions);
-    }
-
-    @Test
-    public void testAssignWithStates() throws Exception {
-        StreamingConfig config = new StreamingConfig(configProps());
-
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-
-        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
-        builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1");
-
-        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
-        builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2");
-        builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2");
-
-        List<String> topics = Utils.mkList("topic1", "topic2");
-
-        TaskId task00 = new TaskId(0, 0);
-        TaskId task01 = new TaskId(0, 1);
-        TaskId task02 = new TaskId(0, 2);
-        TaskId task10 = new TaskId(1, 0);
-        TaskId task11 = new TaskId(1, 1);
-        TaskId task12 = new TaskId(1, 2);
-
-        UUID uuid1 = UUID.randomUUID();
-        UUID uuid2 = UUID.randomUUID();
-        String client1 = "client1";
-        String client2 = "client2";
-
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
-
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
-        subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
-        subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
-
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
-
-        // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match
-        assertEquals(2, assignments.get("consumer10").partitions().size());
-        assertEquals(2, assignments.get("consumer11").partitions().size());
-        assertEquals(2, assignments.get("consumer20").partitions().size());
-
-        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size());
-        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size());
-        assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size());
-
-        // check tasks for state topics
-        assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1"));
-        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2"));
-        assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3"));
-    }
-
-    @Test
-    public void testAssignWithStandbyReplicas() throws Exception {
-        Properties props = configProps();
-        props.setProperty(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
-        StreamingConfig config = new StreamingConfig(props);
-
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-        List<String> topics = Utils.mkList("topic1", "topic2");
-        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
-
-
-        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
-        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
-        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
-        final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
-        final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
-        final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
-
-        UUID uuid1 = UUID.randomUUID();
-        UUID uuid2 = UUID.randomUUID();
-        String client1 = "client1";
-        String client2 = "client2";
-
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
-
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
-        subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
-        subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
-
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
-
-        Set<TaskId> allActiveTasks = new HashSet<>();
-        Set<TaskId> allStandbyTasks = new HashSet<>();
-
-        // the first consumer
-        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
-        allStandbyTasks.addAll(info10.standbyTasks.keySet());
-
-        // the second consumer
-        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
-        allActiveTasks.addAll(info11.activeTasks);
-        allStandbyTasks.addAll(info11.standbyTasks.keySet());
-
-        // check active tasks assigned to the first client
-        assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
-        assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
-
-        // the third consumer
-        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
-        allActiveTasks.addAll(info20.activeTasks);
-        allStandbyTasks.addAll(info20.standbyTasks.keySet());
-
-        // all task ids are in the active tasks and also in the standby tasks
-
-        assertEquals(3, allActiveTasks.size());
-        assertEquals(allTasks, allActiveTasks);
-
-        assertEquals(3, allStandbyTasks.size());
-        assertEquals(allTasks, allStandbyTasks);
-    }
-
-    private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
-
-        // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
-
-        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
-
-        // check if the number of assigned partitions == the size of active task id list
-        assertEquals(assignment.partitions().size(), info.activeTasks.size());
-
-        // check if active tasks are consistent
-        List<TaskId> activeTasks = new ArrayList<>();
-        Set<String> activeTopics = new HashSet<>();
-        for (TopicPartition partition : assignment.partitions()) {
-            // since default grouper, taskid.partition == partition.partition()
-            activeTasks.add(new TaskId(0, partition.partition()));
-            activeTopics.add(partition.topic());
-        }
-        assertEquals(activeTasks, info.activeTasks);
-
-        // check if active partitions cover all topics
-        assertEquals(allTopics, activeTopics);
-
-        // check if standby tasks are consistent
-        Set<String> standbyTopics = new HashSet<>();
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
-            TaskId id = entry.getKey();
-            Set<TopicPartition> partitions = entry.getValue();
-            for (TopicPartition partition : partitions) {
-                // since default grouper, taskid.partition == partition.partition()
-                assertEquals(id.partition, partition.partition());
-
-                standbyTopics.add(partition.topic());
-            }
-        }
-
-        if (info.standbyTasks.size() > 0)
-            // check if standby partitions cover all topics
-            assertEquals(allTopics, standbyTopics);
-
-        return info;
-    }
-
-    @Test
-    public void testOnAssignment() throws Exception {
-        StreamingConfig config = new StreamingConfig(configProps());
-
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
-        TopicPartition t2p3 = new TopicPartition("topic2", 3);
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-
-        UUID uuid = UUID.randomUUID();
-        String client1 = "client1";
-
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid, new Metrics(), new SystemTime());
-
-        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
-
-        List<TaskId> activeTaskList = Utils.mkList(task0, task3);
-        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-        standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
-        standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
-
-        AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks);
-        PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
-        partitionAssignor.onAssignment(assignment);
-
-        assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0));
-        assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3));
-        assertEquals(standbyTasks, partitionAssignor.standbyTasks());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index f2ef2ea..60bd309 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -27,12 +27,12 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -60,21 +60,22 @@ public class ProcessorTopologyTest {
     private static long timestamp = 1000L;
 
     private ProcessorTopologyTestDriver driver;
-    private StreamingConfig config;
+    private StreamsConfig config;
 
     @Before
     public void setup() {
         // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
         File localState = StateUtils.tempDir();
         Properties props = new Properties();
-        props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamingConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
-        props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
-        props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        props.setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        this.config = new StreamingConfig(props);
+        props.setProperty(StreamsConfig.JOB_ID_CONFIG, "processor-topology-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
+        props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
+        props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        this.config = new StreamsConfig(props);
     }
 
     @After
@@ -193,8 +194,8 @@ public class ProcessorTopologyTest {
         assertNull(driver.readOutput(topic));
     }
 
-    protected <K, V> StreamPartitioner<K, V> constantPartitioner(final Integer partition) {
-        return new StreamPartitioner<K, V>() {
+    protected <K, V> StreamsPartitioner<K, V> constantPartitioner(final Integer partition) {
+        return new StreamsPartitioner<K, V>() {
             @Override
             public Integer partition(K key, V value, int numPartitions) {
                 return partition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 85a8a15..fd604b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -85,17 +85,18 @@ public class StandbyTaskTest {
             )
     );
 
-    private StreamingConfig createConfig(final File baseDir) throws Exception {
-        return new StreamingConfig(new Properties() {
+    private StreamsConfig createConfig(final File baseDir) throws Exception {
+        return new StreamsConfig(new Properties() {
             {
-                setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-                setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+                setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+                setProperty(StreamsConfig.JOB_ID_CONFIG, "standby-task-test");
+                setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+                setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
             }
         });
     }
@@ -130,7 +131,7 @@ public class StandbyTaskTest {
     public void testStorePartitions() throws Exception {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            StreamingConfig config = createConfig(baseDir);
+            StreamsConfig config = createConfig(baseDir);
             StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
@@ -145,7 +146,7 @@ public class StandbyTaskTest {
     public void testUpdateNonPersistentStore() throws Exception {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            StreamingConfig config = createConfig(baseDir);
+            StreamsConfig config = createConfig(baseDir);
             StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
@@ -164,7 +165,7 @@ public class StandbyTaskTest {
     public void testUpdate() throws Exception {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            StreamingConfig config = createConfig(baseDir);
+            StreamsConfig config = createConfig(baseDir);
             StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
@@ -227,7 +228,7 @@ public class StandbyTaskTest {
                     new PartitionInfo("ktable1", 2, Node.noNode(), new Node[0], new Node[0])
             ));
 
-            StreamingConfig config = createConfig(baseDir);
+            StreamsConfig config = createConfig(baseDir);
             StandbyTask task = new StandbyTask(taskId, jobId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));


[4/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to KafkaStreams

Posted by gw...@apache.org.
KAFKA-3136: Rename KafkaStreaming to KafkaStreams

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Gwen Shapira

Closes #800 from guozhangwang/KRename


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/21c6cfe5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/21c6cfe5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/21c6cfe5

Branch: refs/heads/trunk
Commit: 21c6cfe50dbe818a392c28f48ce8891f7f99aaf6
Parents: 91ba074
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Jan 22 13:00:00 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Jan 22 13:00:00 2016 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/KafkaStreaming.java    | 167 ------
 .../org/apache/kafka/streams/KafkaStreams.java  | 171 +++++++
 .../java/org/apache/kafka/streams/KeyValue.java |  37 ++
 .../apache/kafka/streams/StreamingConfig.java   | 314 ------------
 .../apache/kafka/streams/StreamingMetrics.java  |  27 -
 .../org/apache/kafka/streams/StreamsConfig.java | 303 +++++++++++
 .../apache/kafka/streams/StreamsMetrics.java    |  27 +
 .../kafka/streams/examples/KStreamJob.java      |  24 +-
 .../kafka/streams/examples/ProcessorJob.java    |  34 +-
 .../apache/kafka/streams/kstream/KStream.java   |   1 +
 .../apache/kafka/streams/kstream/KTable.java    |   1 +
 .../apache/kafka/streams/kstream/KeyValue.java  |  34 --
 .../kstream/internals/KStreamAggregate.java     |   2 +-
 .../kstream/internals/KStreamFlatMap.java       |   2 +-
 .../streams/kstream/internals/KStreamImpl.java  |  10 +-
 .../kstream/internals/KStreamKStreamJoin.java   |   2 +-
 .../streams/kstream/internals/KStreamMap.java   |   2 +-
 .../kstream/internals/KStreamReduce.java        |   2 +-
 .../kstream/internals/KStreamTransform.java     |   2 +-
 .../streams/kstream/internals/KTableImpl.java   |   2 +-
 .../kstream/internals/KTableRepartitionMap.java |   2 +-
 .../internals/WindowedStreamPartitioner.java    |  52 --
 .../internals/WindowedStreamsPartitioner.java   |  52 ++
 .../streams/processor/ProcessorContext.java     |   6 +-
 .../streams/processor/StreamPartitioner.java    |  59 ---
 .../streams/processor/StreamsPartitioner.java   |  59 +++
 .../streams/processor/TopologyBuilder.java      |  76 +--
 .../processor/internals/AbstractTask.java       |   6 +-
 .../KafkaStreamingPartitionAssignor.java        | 483 ------------------
 .../internals/ProcessorContextImpl.java         |  12 +-
 .../processor/internals/RecordCollector.java    |   4 +-
 .../streams/processor/internals/SinkNode.java   |   6 +-
 .../processor/internals/StandbyContextImpl.java |  12 +-
 .../processor/internals/StandbyTask.java        |  12 +-
 .../internals/StreamPartitionAssignor.java      | 483 ++++++++++++++++++
 .../streams/processor/internals/StreamTask.java |  16 +-
 .../processor/internals/StreamThread.java       |  48 +-
 .../org/apache/kafka/streams/state/Entry.java   |  42 --
 .../kafka/streams/state/KeyValueIterator.java   |   4 +-
 .../kafka/streams/state/KeyValueStore.java      |   3 +-
 .../streams/state/WindowStoreIterator.java      |   2 +-
 .../InMemoryKeyValueStoreSupplier.java          |  12 +-
 .../InMemoryLRUCacheStoreSupplier.java          |  12 +-
 .../state/internals/MeteredKeyValueStore.java   |  14 +-
 .../state/internals/MeteredWindowStore.java     |   6 +-
 .../streams/state/internals/RocksDBStore.java   |  16 +-
 .../state/internals/RocksDBWindowStore.java     |   9 +-
 .../kafka/streams/StreamingConfigTest.java      |  75 ---
 .../apache/kafka/streams/StreamsConfigTest.java |  76 +++
 .../kstream/internals/KStreamFlatMapTest.java   |   2 +-
 .../internals/KStreamKTableLeftJoinTest.java    |   2 +-
 .../kstream/internals/KStreamMapTest.java       |   2 +-
 .../kstream/internals/KStreamTransformTest.java |   2 +-
 .../kstream/internals/KTableKTableJoinTest.java |   2 +-
 .../internals/KTableKTableLeftJoinTest.java     |   2 +-
 .../internals/KTableKTableOuterJoinTest.java    |   2 +-
 .../WindowedStreamPartitionerTest.java          |  84 ---
 .../WindowedStreamsPartitionerTest.java         |  84 +++
 .../KafkaStreamingPartitionAssignorTest.java    | 508 ------------------
 .../internals/ProcessorTopologyTest.java        |  27 +-
 .../processor/internals/StandbyTaskTest.java    |  31 +-
 .../internals/StreamPartitionAssignorTest.java  | 509 +++++++++++++++++++
 .../processor/internals/StreamTaskTest.java     |  27 +-
 .../processor/internals/StreamThreadTest.java   |  37 +-
 .../streams/state/KeyValueStoreTestDriver.java  |  45 +-
 .../internals/AbstractKeyValueStoreTest.java    |  22 +-
 .../state/internals/RocksDBWindowStoreTest.java |  38 +-
 .../apache/kafka/test/KStreamTestDriver.java    |   4 +-
 .../apache/kafka/test/MockProcessorContext.java |  14 +-
 .../apache/kafka/test/NoOpKeyValueMapper.java   |   2 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |  32 +-
 71 files changed, 2133 insertions(+), 2168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
deleted file mode 100644
index 0d99739..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Kafka Streaming allows for performing continuous computation on input coming from one or more input topics and
- * sends output to zero or more output topics.
- * <p>
- * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify
- * the transformation.
- * The {@link KafkaStreaming} instance will be responsible for the lifecycle of these processors. It will instantiate and
- * start one or more of these processors to process the Kafka partitions assigned to this particular instance.
- * <p>
- * This streaming instance will co-ordinate with any other instances (whether in this same process, on other processes
- * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being
- * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or
- * started in the appropriate processes to balance processing load.
- * <p>
- * Internally the {@link KafkaStreaming} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
- * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
- * <p>
- * A simple example might look like this:
- * <pre>
- *    Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
- *    props.put("bootstrap.servers", "localhost:4242");
- *    props.put("key.deserializer", StringDeserializer.class);
- *    props.put("value.deserializer", StringDeserializer.class);
- *    props.put("key.serializer", StringSerializer.class);
- *    props.put("value.serializer", IntegerSerializer.class);
- *    props.put("timestamp.extractor", MyTimestampExtractor.class);
- *    StreamingConfig config = new StreamingConfig(props);
- *
- *    KStreamBuilder builder = new KStreamBuilder();
- *    builder.from("topic1").mapValue(value -&gt; value.length()).to("topic2");
- *
- *    KafkaStreaming streaming = new KafkaStreaming(builder, config);
- *    streaming.start();
- * </pre>
- *
- */
-public class KafkaStreaming {
-
-    private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class);
-    private static final AtomicInteger STREAMING_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
-    private static final String JMX_PREFIX = "kafka.streams";
-
-    // container states
-    private static final int CREATED = 0;
-    private static final int RUNNING = 1;
-    private static final int STOPPED = 2;
-    private int state = CREATED;
-
-    private final StreamThread[] threads;
-
-    // processId is expected to be unique across JVMs and to be used
-    // in userData of the subscription request to allow assignor be aware
-    // of the co-location of stream thread's consumers. It is for internal
-    // usage only and should not be exposed to users at all.
-    private final UUID processId;
-
-    public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception {
-        // create the metrics
-        Time time = new SystemTime();
-
-        this.processId = UUID.randomUUID();
-
-        String jobId = config.getString(StreamingConfig.JOB_ID_CONFIG);
-        if (jobId.length() <= 0)
-            jobId = "kafka-streams";
-
-        String clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
-        if (clientId.length() <= 0)
-            clientId = jobId + "-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
-
-        List<MetricsReporter> reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                MetricsReporter.class);
-        reporters.add(new JmxReporter(JMX_PREFIX));
-
-        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG))
-            .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
-                TimeUnit.MILLISECONDS);
-
-        Metrics metrics = new Metrics(metricConfig, reporters, time);
-
-        this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)];
-        for (int i = 0; i < this.threads.length; i++) {
-            this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
-        }
-    }
-
-    /**
-     * Start the stream process by starting all its threads
-     */
-    public synchronized void start() {
-        log.debug("Starting Kafka Stream process");
-
-        if (state == CREATED) {
-            for (StreamThread thread : threads)
-                thread.start();
-
-            state = RUNNING;
-
-            log.info("Started Kafka Stream process");
-        } else {
-            throw new IllegalStateException("This process was already started.");
-        }
-    }
-
-    /**
-     * Shutdown this stream process by signaling the threads to stop,
-     * wait for them to join and clean up the process instance.
-     */
-    public synchronized void close() {
-        log.debug("Stopping Kafka Stream process");
-
-        if (state == RUNNING) {
-            // signal the threads to stop and wait
-            for (StreamThread thread : threads)
-                thread.close();
-
-            for (StreamThread thread : threads) {
-                try {
-                    thread.join();
-                } catch (InterruptedException ex) {
-                    Thread.interrupted();
-                }
-            }
-
-            state = STOPPED;
-
-            log.info("Stopped Kafka Stream process");
-        } else {
-            throw new IllegalStateException("This process has not started yet.");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
new file mode 100644
index 0000000..071cef6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
+ * sends output to zero or more output topics.
+ * <p>
+ * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify
+ * the transformation.
+ * The {@link KafkaStreams} instance will be responsible for the lifecycle of these processors. It will instantiate and
+ * start one or more of these processors to process the Kafka partitions assigned to this particular instance.
+ * <p>
+ * This {@link KafkaStreams} instance will co-ordinate with any other instances (whether in this same process, on other processes
+ * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being
+ * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or
+ * started in the appropriate processes to balance processing load.
+ * <p>
+ * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
+ * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
+ * <p>
+ * A simple example might look like this:
+ * <pre>
+ *    Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
+ *    props.put("bootstrap.servers", "localhost:4242");
+ *    props.put("key.deserializer", StringDeserializer.class);
+ *    props.put("value.deserializer", StringDeserializer.class);
+ *    props.put("key.serializer", StringSerializer.class);
+ *    props.put("value.serializer", IntegerSerializer.class);
+ *    props.put("timestamp.extractor", MyTimestampExtractor.class);
+ *    StreamsConfig config = new StreamsConfig(props);
+ *
+ *    KStreamBuilder builder = new KStreamBuilder();
+ *    builder.from("topic1").mapValue(value -&gt; value.length()).to("topic2");
+ *
+ *    KafkaStreams streams = new KafkaStreams(builder, config);
+ *    streams.start();
+ * </pre>
+ *
+ */
+public class KafkaStreams {
+
+    private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
+    private static final AtomicInteger STREAM_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+    private static final String JMX_PREFIX = "kafka.streams";
+
+    // container states
+    private static final int CREATED = 0;
+    private static final int RUNNING = 1;
+    private static final int STOPPED = 2;
+    private int state = CREATED;
+
+    private final StreamThread[] threads;
+
+    // processId is expected to be unique across JVMs and to be used
+    // in userData of the subscription request to allow assignor be aware
+    // of the co-location of stream thread's consumers. It is for internal
+    // usage only and should not be exposed to users at all.
+    private final UUID processId;
+
+    public KafkaStreams(TopologyBuilder builder, Properties props) {
+        this(builder, new StreamsConfig(props));
+    }
+
+    public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
+        // create the metrics
+        Time time = new SystemTime();
+
+        this.processId = UUID.randomUUID();
+
+        // JobId is a required config and hence should always have value
+        String jobId = config.getString(StreamsConfig.JOB_ID_CONFIG);
+
+        String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
+        if (clientId.length() <= 0)
+            clientId = jobId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
+
+        List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
+                MetricsReporter.class);
+        reporters.add(new JmxReporter(JMX_PREFIX));
+
+        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+            .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+                TimeUnit.MILLISECONDS);
+
+        Metrics metrics = new Metrics(metricConfig, reporters, time);
+
+        this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        for (int i = 0; i < this.threads.length; i++) {
+            this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
+        }
+    }
+
+    /**
+     * Start the stream process by starting all its threads
+     */
+    public synchronized void start() {
+        log.debug("Starting Kafka Stream process");
+
+        if (state == CREATED) {
+            for (StreamThread thread : threads)
+                thread.start();
+
+            state = RUNNING;
+
+            log.info("Started Kafka Stream process");
+        } else {
+            throw new IllegalStateException("This process was already started.");
+        }
+    }
+
+    /**
+     * Shutdown this stream process by signaling the threads to stop,
+     * wait for them to join and clean up the process instance.
+     */
+    public synchronized void close() {
+        log.debug("Stopping Kafka Stream process");
+
+        if (state == RUNNING) {
+            // signal the threads to stop and wait
+            for (StreamThread thread : threads)
+                thread.close();
+
+            for (StreamThread thread : threads) {
+                try {
+                    thread.join();
+                } catch (InterruptedException ex) {
+                    Thread.interrupted();
+                }
+            }
+
+            state = STOPPED;
+
+            log.info("Stopped Kafka Stream process");
+        } else {
+            throw new IllegalStateException("This process has not started yet.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
new file mode 100644
index 0000000..472e677
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+public class KeyValue<K, V> {
+
+    public final K key;
+    public final V value;
+
+    public KeyValue(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public static <K, V> KeyValue<K, V> pair(K key, V value) {
+        return new KeyValue<>(key, value);
+    }
+
+    public String toString() {
+        return "KeyValue(" + key + ", " + value + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
deleted file mode 100644
index e89d030..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
-import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-
-import java.util.Map;
-
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-
-public class StreamingConfig extends AbstractConfig {
-
-    private static final ConfigDef CONFIG;
-
-    /** <code>state.dir</code> */
-    public static final String STATE_DIR_CONFIG = "state.dir";
-    private static final String STATE_DIR_DOC = "Directory location for state store.";
-
-    /** <code>zookeeper.connect<code/> */
-    public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
-    private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
-
-    /** <code>commit.interval.ms</code> */
-    public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
-    private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
-
-    /** <code>poll.ms</code> */
-    public static final String POLL_MS_CONFIG = "poll.ms";
-    private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
-
-    /** <code>num.stream.threads</code> */
-    public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
-    private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
-
-    /** <code>num.stream.threads</code> */
-    public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
-    private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
-
-    /** <code>buffered.records.per.partition</code> */
-    public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
-    private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
-
-    /** <code>state.cleanup.delay</code> */
-    public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
-    private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated.";
-
-    /** <code>total.records.to.process</code> */
-    public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process";
-    private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records.";
-
-    /** <code>window.time.ms</code> */
-    public static final String WINDOW_TIME_MS_CONFIG = "window.time.ms";
-    private static final String WINDOW_TIME_MS_DOC = "Setting this to a non-negative value will cause the processor to get called "
-                                                     + "with this frequency even if there is no message.";
-
-    /** <code>timestamp.extractor</code> */
-    public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
-    private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
-
-    /** <code>partition.grouper</code> */
-    public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
-    private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
-
-    /** <code>job.id</code> */
-    public static final String JOB_ID_CONFIG = "job.id";
-    public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
-
-    /** <code>key.serializer</code> */
-    public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
-
-    /** <code>value.serializer</code> */
-    public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-
-    /** <code>key.deserializer</code> */
-    public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-
-    /** <code>value.deserializer</code> */
-    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-
-    /** <code>metrics.sample.window.ms</code> */
-    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
-
-    /** <code>metrics.num.samples</code> */
-    public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
-
-    /** <code>metric.reporters</code> */
-    public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
-
-    /** <code>bootstrap.servers</code> */
-    public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
-
-    /** <code>client.id</code> */
-    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
-
-    private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
-
-    static {
-        CONFIG = new ConfigDef().define(JOB_ID_CONFIG,
-                                        Type.STRING,
-                                        "",
-                                        Importance.MEDIUM,
-                                        StreamingConfig.JOB_ID_DOC)
-                                .define(CLIENT_ID_CONFIG,
-                                        Type.STRING,
-                                        "",
-                                        Importance.MEDIUM,
-                                        CommonClientConfigs.CLIENT_ID_DOC)
-                                .define(ZOOKEEPER_CONNECT_CONFIG,
-                                        Type.STRING,
-                                        "",
-                                        Importance.HIGH,
-                                        StreamingConfig.ZOOKEEPER_CONNECT_DOC)
-                                .define(STATE_DIR_CONFIG,
-                                        Type.STRING,
-                                        SYSTEM_TEMP_DIRECTORY,
-                                        Importance.MEDIUM,
-                                        STATE_DIR_DOC)
-                                .define(COMMIT_INTERVAL_MS_CONFIG,
-                                        Type.LONG,
-                                        30000,
-                                        Importance.HIGH,
-                                        COMMIT_INTERVAL_MS_DOC)
-                                .define(POLL_MS_CONFIG,
-                                        Type.LONG,
-                                        100,
-                                        Importance.LOW,
-                                        POLL_MS_DOC)
-                                .define(NUM_STREAM_THREADS_CONFIG,
-                                        Type.INT,
-                                        1,
-                                        Importance.LOW,
-                                        NUM_STREAM_THREADS_DOC)
-                                .define(NUM_STANDBY_REPLICAS_CONFIG,
-                                        Type.INT,
-                                        0,
-                                        Importance.LOW,
-                                        NUM_STANDBY_REPLICAS_DOC)
-                                .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
-                                        Type.INT,
-                                        1000,
-                                        Importance.LOW,
-                                        BUFFERED_RECORDS_PER_PARTITION_DOC)
-                                .define(STATE_CLEANUP_DELAY_MS_CONFIG,
-                                        Type.LONG,
-                                        60000,
-                                        Importance.LOW,
-                                        STATE_CLEANUP_DELAY_MS_DOC)
-                                .define(TOTAL_RECORDS_TO_PROCESS,
-                                        Type.LONG,
-                                        -1L,
-                                        Importance.LOW,
-                                        TOTAL_RECORDS_TO_DOC)
-                                .define(WINDOW_TIME_MS_CONFIG,
-                                        Type.LONG,
-                                        -1L,
-                                        Importance.MEDIUM,
-                                        WINDOW_TIME_MS_DOC)
-                                .define(KEY_SERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ProducerConfig.KEY_SERIALIZER_CLASS_DOC)
-                                .define(VALUE_SERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ProducerConfig.VALUE_SERIALIZER_CLASS_DOC)
-                                .define(KEY_DESERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC)
-                                .define(VALUE_DESERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
-                                .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        TIMESTAMP_EXTRACTOR_CLASS_DOC)
-                                .define(PARTITION_GROUPER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        DefaultPartitionGrouper.class,
-                                        Importance.HIGH,
-                                        PARTITION_GROUPER_CLASS_DOC)
-                                .define(BOOTSTRAP_SERVERS_CONFIG,
-                                        Type.STRING,
-                                        Importance.HIGH,
-                                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
-                                .define(METRIC_REPORTER_CLASSES_CONFIG,
-                                        Type.LIST,
-                                        "",
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
-                                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
-                                        Type.LONG,
-                                        30000,
-                                        atLeast(0),
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
-                                .define(METRICS_NUM_SAMPLES_CONFIG,
-                                        Type.INT,
-                                        2,
-                                        atLeast(1),
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
-    }
-
-    public static class InternalConfig {
-        public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
-    }
-
-    public StreamingConfig(Map<?, ?> props) {
-        super(CONFIG, props);
-    }
-
-    public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) {
-        Map<String, Object> props = getBaseConsumerConfigs();
-
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
-        props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
-        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
-
-        props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
-
-        return props;
-    }
-
-    public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
-        Map<String, Object> props = getBaseConsumerConfigs();
-
-        // no need to set group id for a restore consumer
-        props.remove(ConsumerConfig.GROUP_ID_CONFIG);
-
-        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
-
-        return props;
-    }
-
-    private Map<String, Object> getBaseConsumerConfigs() {
-        Map<String, Object> props = this.originals();
-
-        // set consumer default property values
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
-        // remove properties that are not required for consumers
-        props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG);
-        props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-        props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
-        props.remove(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
-
-        return props;
-    }
-
-    public Map<String, Object> getProducerConfigs(String clientId) {
-        Map<String, Object> props = this.originals();
-
-        // set producer default property values
-        props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
-
-        // remove properties that are not required for producers
-        props.remove(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-        props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-        props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
-
-        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
-
-        return props;
-    }
-
-    public Serializer keySerializer() {
-        return getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
-    }
-
-    public Serializer valueSerializer() {
-        return getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
-    }
-
-    public Deserializer keyDeserializer() {
-        return getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
-    }
-
-    public Deserializer valueDeserializer() {
-        return getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
-    }
-
-    public static void main(String[] args) {
-        System.out.println(CONFIG.toHtmlTable());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
deleted file mode 100644
index ebf80b3..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.metrics.Sensor;
-
-public interface StreamingMetrics {
-
-    Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
-
-    void recordLatency(Sensor sensor, long startNs, long endNs);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
new file mode 100644
index 0000000..3843b1d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
+public class StreamsConfig extends AbstractConfig {
+
+    private static final ConfigDef CONFIG;
+
+    /** <code>state.dir</code> */
+    public static final String STATE_DIR_CONFIG = "state.dir";
+    private static final String STATE_DIR_DOC = "Directory location for state store.";
+
+    /** <code>zookeeper.connect<code/> */
+    public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
+    private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
+
+    /** <code>commit.interval.ms</code> */
+    public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
+    private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
+
+    /** <code>poll.ms</code> */
+    public static final String POLL_MS_CONFIG = "poll.ms";
+    private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
+
+    /** <code>num.stream.threads</code> */
+    public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
+    private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
+
+    /** <code>num.stream.threads</code> */
+    public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
+    private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
+
+    /** <code>buffered.records.per.partition</code> */
+    public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
+    private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
+
+    /** <code>state.cleanup.delay</code> */
+    public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
+    private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated.";
+
+    /** <code>total.records.to.process</code> */
+    public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process";
+    private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records.";
+
+    /** <code>timestamp.extractor</code> */
+    public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
+    private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
+
+    /** <code>partition.grouper</code> */
+    public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
+    private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
+
+    /** <code>job.id</code> */
+    public static final String JOB_ID_CONFIG = "job.id";
+    public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
+
+    /** <code>key.serializer</code> */
+    public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+
+    /** <code>value.serializer</code> */
+    public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+    /** <code>key.deserializer</code> */
+    public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+
+    /** <code>value.deserializer</code> */
+    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+    /** <code>metrics.sample.window.ms</code> */
+    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+
+    /** <code>metrics.num.samples</code> */
+    public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
+
+    /** <code>metric.reporters</code> */
+    public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+
+    /** <code>bootstrap.servers</code> */
+    public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+
+    /** <code>client.id</code> */
+    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+
+    private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
+
+    static {
+        CONFIG = new ConfigDef().define(JOB_ID_CONFIG,      // required with no default value
+                                        Type.STRING,
+                                        Importance.HIGH,
+                                        StreamsConfig.JOB_ID_DOC)
+                                .define(BOOTSTRAP_SERVERS_CONFIG,       // required with no default value
+                                        Type.STRING,
+                                        Importance.HIGH,
+                                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+                                .define(CLIENT_ID_CONFIG,
+                                        Type.STRING,
+                                        "",
+                                        Importance.HIGH,
+                                        CommonClientConfigs.CLIENT_ID_DOC)
+                                .define(ZOOKEEPER_CONNECT_CONFIG,
+                                        Type.STRING,
+                                        "",
+                                        Importance.HIGH,
+                                        StreamsConfig.ZOOKEEPER_CONNECT_DOC)
+                                .define(STATE_DIR_CONFIG,
+                                        Type.STRING,
+                                        SYSTEM_TEMP_DIRECTORY,
+                                        Importance.HIGH,
+                                        STATE_DIR_DOC)
+                                .define(KEY_SERIALIZER_CLASS_CONFIG,        // required with no default value
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        ProducerConfig.KEY_SERIALIZER_CLASS_DOC)
+                                .define(VALUE_SERIALIZER_CLASS_CONFIG,      // required with no default value
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        ProducerConfig.VALUE_SERIALIZER_CLASS_DOC)
+                                .define(KEY_DESERIALIZER_CLASS_CONFIG,      // required with no default value
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC)
+                                .define(VALUE_DESERIALIZER_CLASS_CONFIG,    // required with no default value
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
+                                .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Importance.MEDIUM,
+                                        TIMESTAMP_EXTRACTOR_CLASS_DOC)
+                                .define(PARTITION_GROUPER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        DefaultPartitionGrouper.class,
+                                        Importance.MEDIUM,
+                                        PARTITION_GROUPER_CLASS_DOC)
+                                .define(COMMIT_INTERVAL_MS_CONFIG,
+                                        Type.LONG,
+                                        30000,
+                                        Importance.LOW,
+                                        COMMIT_INTERVAL_MS_DOC)
+                                .define(POLL_MS_CONFIG,
+                                        Type.LONG,
+                                        100,
+                                        Importance.LOW,
+                                        POLL_MS_DOC)
+                                .define(NUM_STREAM_THREADS_CONFIG,
+                                        Type.INT,
+                                        1,
+                                        Importance.LOW,
+                                        NUM_STREAM_THREADS_DOC)
+                                .define(NUM_STANDBY_REPLICAS_CONFIG,
+                                        Type.INT,
+                                        0,
+                                        Importance.LOW,
+                                        NUM_STANDBY_REPLICAS_DOC)
+                                .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+                                        Type.INT,
+                                        1000,
+                                        Importance.LOW,
+                                        BUFFERED_RECORDS_PER_PARTITION_DOC)
+                                .define(STATE_CLEANUP_DELAY_MS_CONFIG,
+                                        Type.LONG,
+                                        60000,
+                                        Importance.LOW,
+                                        STATE_CLEANUP_DELAY_MS_DOC)
+                                .define(TOTAL_RECORDS_TO_PROCESS,
+                                        Type.LONG,
+                                        -1L,
+                                        Importance.LOW,
+                                        TOTAL_RECORDS_TO_DOC)
+                                .define(METRIC_REPORTER_CLASSES_CONFIG,
+                                        Type.LIST,
+                                        "",
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                                        Type.LONG,
+                                        30000,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+                                .define(METRICS_NUM_SAMPLES_CONFIG,
+                                        Type.INT,
+                                        2,
+                                        atLeast(1),
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
+    }
+
+    public static class InternalConfig {
+        public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
+    }
+
+    public StreamsConfig(Map<?, ?> props) {
+        super(CONFIG, props);
+    }
+
+    public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) {
+        Map<String, Object> props = getBaseConsumerConfigs();
+
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
+        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
+        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
+
+        props.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
+
+        return props;
+    }
+
+    public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
+        Map<String, Object> props = getBaseConsumerConfigs();
+
+        // no need to set group id for a restore consumer
+        props.remove(ConsumerConfig.GROUP_ID_CONFIG);
+
+        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
+
+        return props;
+    }
+
+    private Map<String, Object> getBaseConsumerConfigs() {
+        Map<String, Object> props = this.originals();
+
+        // set consumer default property values
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        // remove properties that are not required for consumers
+        props.remove(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG);
+        props.remove(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+        props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+        props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+
+        return props;
+    }
+
+    public Map<String, Object> getProducerConfigs(String clientId) {
+        Map<String, Object> props = this.originals();
+
+        // set producer default property values
+        props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+
+        // remove properties that are not required for producers
+        props.remove(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+        props.remove(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+        props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+
+        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
+
+        return props;
+    }
+
+    public Serializer keySerializer() {
+        return getConfiguredInstance(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+    }
+
+    public Serializer valueSerializer() {
+        return getConfiguredInstance(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+    }
+
+    public Deserializer keyDeserializer() {
+        return getConfiguredInstance(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+    }
+
+    public Deserializer valueDeserializer() {
+        return getConfiguredInstance(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+    }
+
+    public static void main(String[] args) {
+        System.out.println(CONFIG.toHtmlTable());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
new file mode 100644
index 0000000..a151392
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.metrics.Sensor;
+
+public interface StreamsMetrics {
+
+    Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
+
+    void recordLatency(Sensor sensor, long startNs, long endNs);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
index 88a8955..a234395 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
@@ -20,11 +20,11 @@ package org.apache.kafka.streams.examples;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.KafkaStreaming;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 
@@ -34,14 +34,14 @@ public class KStreamJob {
 
     public static void main(String[] args) throws Exception {
         Properties props = new Properties();
-        props.put(StreamingConfig.JOB_ID_CONFIG, "example-kstream");
-        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
-        StreamingConfig config = new StreamingConfig(props);
+        props.put(StreamsConfig.JOB_ID_CONFIG, "example-kstream");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+        StreamsConfig config = new StreamsConfig(props);
 
         KStreamBuilder builder = new KStreamBuilder();
 
@@ -78,7 +78,7 @@ public class KStreamJob {
         streams[0].to("topic2");
         streams[1].to("topic3");
 
-        KafkaStreaming kstream = new KafkaStreaming(builder, config);
+        KafkaStreams kstream = new KafkaStreams(builder, config);
         kstream.start();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 2d0b79f..e17c16b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -21,13 +21,13 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreaming;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
@@ -70,11 +70,11 @@ public class ProcessorJob {
                     KeyValueIterator<String, Integer> iter = this.kvStore.all();
 
                     while (iter.hasNext()) {
-                        Entry<String, Integer> entry = iter.next();
+                        KeyValue<String, Integer> entry = iter.next();
 
-                        System.out.println("[" + entry.key() + ", " + entry.value() + "]");
+                        System.out.println("[" + entry.key + ", " + entry.value + "]");
 
-                        context.forward(entry.key(), entry.value());
+                        context.forward(entry.key, entry.value);
                     }
 
                     iter.close();
@@ -90,15 +90,15 @@ public class ProcessorJob {
 
     public static void main(String[] args) throws Exception {
         Properties props = new Properties();
-        props.put(StreamingConfig.JOB_ID_CONFIG, "example-processor");
-        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
-        StreamingConfig config = new StreamingConfig(props);
+        props.put(StreamsConfig.JOB_ID_CONFIG, "example-processor");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+        StreamsConfig config = new StreamsConfig(props);
 
         TopologyBuilder builder = new TopologyBuilder();
 
@@ -109,7 +109,7 @@ public class ProcessorJob {
 
         builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
 
-        KafkaStreaming streaming = new KafkaStreaming(builder, config);
-        streaming.start();
+        KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index dfed661..26f04f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 87298d1..feb28ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
 
 /**
  * KTable is an abstraction of a change log stream.

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
deleted file mode 100644
index f633f6e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public class KeyValue<K, V> {
-
-    public final K key;
-    public final V value;
-
-    public KeyValue(K key, V value) {
-        this.key = key;
-        this.value = value;
-    }
-
-    public static <K, V> KeyValue<K, V> pair(K key, V value) {
-        return new KeyValue<>(key, value);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 91bfa9e..26002f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
index daef8b1..ff7f9ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ce89220..98e50c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -36,7 +36,7 @@ import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.streams.state.Serdes;
 
@@ -217,14 +217,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
         String name = topology.newName(SINK_NAME);
-        StreamPartitioner<K, V> streamPartitioner = null;
+        StreamsPartitioner<K, V> streamsPartitioner = null;
 
         if (keySerializer != null && keySerializer instanceof WindowedSerializer) {
             WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
-            streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
+            streamsPartitioner = (StreamsPartitioner<K, V>) new WindowedStreamsPartitioner<Object, V>(windowedSerializer);
         }
 
-        topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name);
+        topology.addSink(name, topic, keySerializer, valSerializer, streamsPartitioner, this.name);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 01e3325..a4ac9b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
index 57f1431..a40449b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 7d6eb27..c484c7b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index a9d8f97..4299c66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.Processor;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 8ee557c..d046090 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Reducer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 12fcc17..499f721 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
deleted file mode 100644
index 10e69cc..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.StreamPartitioner;
-
-public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
-
-    private final WindowedSerializer<K> serializer;
-
-    public WindowedStreamPartitioner(WindowedSerializer<K> serializer) {
-        this.serializer = serializer;
-    }
-
-    /**
-     * WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value
-     * and the current number of partitions. The partition number id determined by the original key of the windowed key
-     * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
-     *
-     * @param windowedKey the key of the message
-     * @param value the value of the message
-     * @param numPartitions the total number of partitions
-     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
-     */
-    public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
-        byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
-
-        // hash the keyBytes to choose a partition
-        return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
-    }
-
-    private static int toPositive(int number) {
-        return number & 0x7fffffff;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
new file mode 100644
index 0000000..ff1fa2c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
+
+public class WindowedStreamsPartitioner<K, V> implements StreamsPartitioner<Windowed<K>, V> {
+
+    private final WindowedSerializer<K> serializer;
+
+    public WindowedStreamsPartitioner(WindowedSerializer<K> serializer) {
+        this.serializer = serializer;
+    }
+
+    /**
+     * WindowedStreamsPartitioner determines the partition number for a message with the given windowed key and value
+     * and the current number of partitions. The partition number id determined by the original key of the windowed key
+     * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
+     *
+     * @param windowedKey the key of the message
+     * @param value the value of the message
+     * @param numPartitions the total number of partitions
+     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
+     */
+    public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
+        byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
+
+        // hash the keyBytes to choose a partition
+        return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+    }
+
+    private static int toPositive(int number) {
+        return number & 0x7fffffff;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index fa19ed7..41e2235 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsMetrics;
 
 import java.io.File;
 
@@ -70,9 +70,9 @@ public interface ProcessorContext {
     /**
      * Returns Metrics instance
      *
-     * @return StreamingMetrics
+     * @return StreamsMetrics
      */
-    StreamingMetrics metrics();
+    StreamsMetrics metrics();
 
     /**
      * Registers and possibly restores the specified storage engine.