You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/01/22 22:00:11 UTC

[3/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to KafkaStreams

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
deleted file mode 100644
index f14d9d9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-/**
- * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
- * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
- * <p>
- * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
- * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
- * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
- * <p>
- * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
- * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
- * automatically manage these instances, and adjust when new topology instances are added or removed.
- * <p>
- * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
- * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
- * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
- * determine to which partition each message should be written.
- * <p>
- * To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner
- * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
- * for that topic.
- * <p>
- * All StreamPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
- * 
- * @param <K> the type of keys
- * @param <V> the type of values
- * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
- *      org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...)
- * @see TopologyBuilder#addSink(String, String, StreamPartitioner, String...)
- */
-public interface StreamPartitioner<K, V> {
-
-    /**
-     * Determine the partition number for a message with the given key and value and the current number of partitions.
-     * 
-     * @param key the key of the message
-     * @param value the value of the message
-     * @param numPartitions the total number of partitions
-     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
-     */
-    Integer partition(K key, V value, int numPartitions);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
new file mode 100644
index 0000000..f8d199d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+/**
+ * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
+ * <p>
+ * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
+ * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
+ * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
+ * <p>
+ * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
+ * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
+ * automatically manage these instances, and adjust when new topology instances are added or removed.
+ * <p>
+ * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
+ * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
+ * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
+ * determine to which partition each message should be written.
+ * <p>
+ * To do this, create a <code>StreamsPartitioner</code> implementation, and when you build your topology specify that custom partitioner
+ * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...) adding a sink}
+ * for that topic.
+ * <p>
+ * All StreamsPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
+ * 
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
+ *      org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...)
+ * @see TopologyBuilder#addSink(String, String, StreamsPartitioner, String...)
+ */
+public interface StreamsPartitioner<K, V> {
+
+    /**
+     * Determine the partition number for a message with the given key and value and the current number of partitions.
+     * 
+     * @param key the key of the message
+     * @param value the value of the message
+     * @param numPartitions the total number of partitions
+     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
+     */
+    Integer partition(K key, V value, int numPartitions);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index d6b63d2..f4e6821 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -46,8 +46,8 @@ import java.util.Set;
  * its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
  * processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
  * is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
- * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreaming}
- * instance that will then {@link org.apache.kafka.streams.KafkaStreaming#start() begin consuming, processing, and producing messages}.
+ * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams}
+ * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing messages}.
  */
 public class TopologyBuilder {
 
@@ -135,9 +135,9 @@ public class TopologyBuilder {
         public final String topic;
         private Serializer keySerializer;
         private Serializer valSerializer;
-        private final StreamPartitioner partitioner;
+        private final StreamsPartitioner partitioner;
 
-        private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner) {
+        private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamsPartitioner partitioner) {
             super(name);
             this.parents = parents.clone();
             this.topic = topic;
@@ -188,9 +188,9 @@ public class TopologyBuilder {
 
     /**
      * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
+     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      *
      * @param name the unique name of the source used to reference this node when
      * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
@@ -208,11 +208,11 @@ public class TopologyBuilder {
      * @param name the unique name of the source used to reference this node when
      * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
      * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param topics the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
@@ -236,18 +236,18 @@ public class TopologyBuilder {
 
     /**
      * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
-     * The sink will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
-     * {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
-     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
+     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      *
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its messages
      * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
-     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, StreamsPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
      */
     public final TopologyBuilder addSink(String name, String topic, String... parentNames) {
         return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
@@ -256,11 +256,11 @@ public class TopologyBuilder {
     /**
      * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic, using
      * the supplied partitioner.
-     * The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
-     * {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
-     * {@link StreamingConfig streaming configuration}.
+     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      * <p>
-     * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
+     * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
      * the named Kafka topic's partitions. Such control is often useful with topologies that use
      * {@link #addStateStore(StateStoreSupplier, String...) state stores}
      * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@@ -274,9 +274,9 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
      */
-    public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) {
+    public final TopologyBuilder addSink(String name, String topic, StreamsPartitioner partitioner, String... parentNames) {
         return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames);
     }
 
@@ -284,7 +284,7 @@ public class TopologyBuilder {
      * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
      * The sink will use the specified key and value serializers.
      * <p>
-     * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
+     * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
      * the named Kafka topic's partitions. Such control is often useful with topologies that use
      * {@link #addStateStore(StateStoreSupplier, String...) state stores}
      * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@@ -293,20 +293,20 @@ public class TopologyBuilder {
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its messages
      * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
-     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
-     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     * @see #addSink(String, String, StreamsPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
      */
     public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
-        return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null, parentNames);
+        return addSink(name, topic, keySerializer, valSerializer, (StreamsPartitioner) null, parentNames);
     }
 
     /**
@@ -316,20 +316,20 @@ public class TopologyBuilder {
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its messages
      * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
-     * should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
-     * {@link StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
-     * should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
-     * {@link StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param partitioner the function that should be used to determine the partition for each message processed by the sink
      * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, StreamsPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      */
-    public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
+    public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner, String... parentNames) {
         if (nodeFactories.containsKey(name))
             throw new TopologyException("Processor " + name + " is already added.");
 
@@ -589,9 +589,9 @@ public class TopologyBuilder {
 
     /**
      * Build the topology for the specified topic group. This is called automatically when passing this builder into the
-     * {@link org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)} constructor.
+     * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)} constructor.
      *
-     * @see org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)
+     * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
      */
     public ProcessorTopology build(Integer topicGroupId) {
         Set<String> nodeGroup;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 569f4e6..ef4c3c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
@@ -48,7 +48,7 @@ public abstract class AbstractTask {
                            ProcessorTopology topology,
                            Consumer<byte[], byte[]> consumer,
                            Consumer<byte[], byte[]> restoreConsumer,
-                           StreamingConfig config,
+                           StreamsConfig config,
                            boolean isStandby) {
         this.id = id;
         this.partitions = new HashSet<>(partitions);
@@ -57,7 +57,7 @@ public abstract class AbstractTask {
 
         // create the processor state manager
         try {
-            File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
+            File stateFile = new File(config.getString(StreamsConfig.STATE_DIR_CONFIG), id.toString());
             // if partitions is null, this is a standby task
             this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
deleted file mode 100644
index 2734f56..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ /dev/null
@@ -1,483 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
-import org.apache.kafka.streams.processor.internals.assignment.ClientState;
-import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignmentException;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.zookeeper.ZooDefs;
-
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.I0Itec.zkclient.ZkClient;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Configurable {
-
-    private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
-
-    private StreamThread streamThread;
-
-    private int numStandbyReplicas;
-    private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
-    private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
-    private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds;
-    private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
-    private Map<TaskId, Set<TopicPartition>> standbyTasks;
-
-    // TODO: the following ZK dependency should be removed after KIP-4
-    private static final String ZK_TOPIC_PATH = "/brokers/topics";
-    private static final String ZK_BROKER_PATH = "/brokers/ids";
-    private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
-
-    private ZkClient zkClient;
-
-    private class ZKStringSerializer implements ZkSerializer {
-
-        @Override
-        public byte[] serialize(Object data) {
-            try {
-                return ((String) data).getBytes("UTF-8");
-            } catch (UnsupportedEncodingException e) {
-                throw new AssertionError(e);
-            }
-        }
-
-        @Override
-        public Object deserialize(byte[] bytes) {
-            try {
-                if (bytes == null)
-                    return null;
-                else
-                    return new String(bytes, "UTF-8");
-            } catch (UnsupportedEncodingException e) {
-                throw new AssertionError(e);
-            }
-        }
-    }
-
-    private List<Integer> getBrokers() {
-        List<Integer> brokers = new ArrayList<>();
-        for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
-            brokers.add(Integer.parseInt(broker));
-        }
-        Collections.sort(brokers);
-
-        log.debug("Read brokers {} from ZK in partition assignor.", brokers);
-
-        return brokers;
-    }
-
-    @SuppressWarnings("unchecked")
-    private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
-        String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
-
-        if (data == null) return null;
-
-        try {
-            ObjectMapper mapper = new ObjectMapper();
-
-            Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
-
-            });
-
-            Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
-
-            log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
-
-            return partitions;
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException {
-        log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
-
-        // we always assign leaders to brokers starting at the first one with replication factor 1
-        List<Integer> brokers = getBrokers();
-
-        Map<Integer, List<Integer>> assignment = new HashMap<>();
-        for (int i = 0; i < numPartitions; i++) {
-            assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size())));
-        }
-
-        // try to write to ZK with open ACL
-        try {
-            Map<String, Object> dataMap = new HashMap<>();
-            dataMap.put("version", 1);
-            dataMap.put("partitions", assignment);
-
-            ObjectMapper mapper = new ObjectMapper();
-            String data = mapper.writeValueAsString(dataMap);
-
-            zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
-        } catch (JsonProcessingException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    private void deleteTopic(String topic) throws ZkNodeExistsException {
-        log.debug("Deleting topic {} from ZK in partition assignor.", topic);
-
-        zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
-    }
-
-    private void addPartitions(String topic, int numPartitions, Map<Integer, List<Integer>> existingAssignment) {
-        log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
-
-        // we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1
-        List<Integer> brokers = getBrokers();
-
-        int startIndex = existingAssignment.size();
-
-        Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
-
-        for (int i = 0; i < numPartitions; i++) {
-            newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size()));
-        }
-
-        // try to write to ZK with open ACL
-        try {
-            Map<String, Object> dataMap = new HashMap<>();
-            dataMap.put("version", 1);
-            dataMap.put("partitions", newAssignment);
-
-            ObjectMapper mapper = new ObjectMapper();
-            String data = mapper.writeValueAsString(dataMap);
-
-            zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
-        } catch (JsonProcessingException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    /**
-     * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
-     * since the former needs later's cached metadata while sending subscriptions,
-     * and the latter needs former's returned assignment when adding tasks.
-     */
-    @Override
-    public void configure(Map<String, ?> configs) {
-        numStandbyReplicas = (Integer) configs.get(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
-
-        Object o = configs.get(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE);
-        if (o == null) {
-            KafkaException ex = new KafkaException("StreamThread is not specified");
-            log.error(ex.getMessage(), ex);
-            throw ex;
-        }
-
-        if (!(o instanceof StreamThread)) {
-            KafkaException ex = new KafkaException(o.getClass().getName() + " is not an instance of " + StreamThread.class.getName());
-            log.error(ex.getMessage(), ex);
-            throw ex;
-        }
-
-        streamThread = (StreamThread) o;
-        streamThread.partitionAssignor(this);
-
-        this.topicGroups = streamThread.builder.topicGroups();
-
-        if (configs.containsKey(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG))
-            zkClient = new ZkClient((String) configs.get(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer());
-    }
-
-    @Override
-    public String name() {
-        return "streaming";
-    }
-
-    @Override
-    public Subscription subscription(Set<String> topics) {
-        // Adds the following information to subscription
-        // 1. Client UUID (a unique id assigned to an instance of KafkaStreaming)
-        // 2. Task ids of previously running tasks
-        // 3. Task ids of valid local states on the client's state directory.
-
-        Set<TaskId> prevTasks = streamThread.prevTasks();
-        Set<TaskId> standbyTasks = streamThread.cachedTasks();
-        standbyTasks.removeAll(prevTasks);
-        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
-
-        return new Subscription(new ArrayList<>(topics), data.encode());
-    }
-
-    @Override
-    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
-        // This assigns tasks to consumer clients in two steps.
-        // 1. using TaskAssignor tasks are assigned to streaming clients.
-        //    - Assign a task to a client which was running it previously.
-        //      If there is no such client, assign a task to a client which has its valid local state.
-        //    - A client may have more than one stream threads.
-        //      The assignor tries to assign tasks to a client proportionally to the number of threads.
-        //    - We try not to assign the same set of tasks to two different clients
-        //    We do the assignment in one-pass. The result may not satisfy above all.
-        // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
-        Map<UUID, Set<String>> consumersByClient = new HashMap<>();
-        Map<UUID, ClientState<TaskId>> states = new HashMap<>();
-
-        // Decode subscription info
-        for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
-            String consumerId = entry.getKey();
-            Subscription subscription = entry.getValue();
-
-            SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
-
-            Set<String> consumers = consumersByClient.get(info.processId);
-            if (consumers == null) {
-                consumers = new HashSet<>();
-                consumersByClient.put(info.processId, consumers);
-            }
-            consumers.add(consumerId);
-
-            ClientState<TaskId> state = states.get(info.processId);
-            if (state == null) {
-                state = new ClientState<>();
-                states.put(info.processId, state);
-            }
-
-            state.prevActiveTasks.addAll(info.prevTasks);
-            state.prevAssignedTasks.addAll(info.prevTasks);
-            state.prevAssignedTasks.addAll(info.standbyTasks);
-            state.capacity = state.capacity + 1d;
-        }
-
-        // get the tasks as partition groups from the partition grouper
-        Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
-        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
-            sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
-        }
-        Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata);
-
-        // add tasks to state topic subscribers
-        stateChangelogTopicToTaskIds = new HashMap<>();
-        internalSourceTopicToTaskIds = new HashMap<>();
-        for (TaskId task : partitionsForTask.keySet()) {
-            for (String stateName : topicGroups.get(task.topicGroupId).stateChangelogTopics) {
-                Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(stateName);
-                if (tasks == null) {
-                    tasks = new HashSet<>();
-                    stateChangelogTopicToTaskIds.put(stateName, tasks);
-                }
-
-                tasks.add(task);
-            }
-
-            for (String topicName : topicGroups.get(task.topicGroupId).interSourceTopics) {
-                Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topicName);
-                if (tasks == null) {
-                    tasks = new HashSet<>();
-                    internalSourceTopicToTaskIds.put(topicName, tasks);
-                }
-
-                tasks.add(task);
-            }
-        }
-
-        // assign tasks to clients
-        states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
-        Map<String, Assignment> assignment = new HashMap<>();
-
-        for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
-            UUID processId = entry.getKey();
-            Set<String> consumers = entry.getValue();
-            ClientState<TaskId> state = states.get(processId);
-
-            ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
-            final int numActiveTasks = state.activeTasks.size();
-            for (TaskId taskId : state.activeTasks) {
-                taskIds.add(taskId);
-            }
-            for (TaskId id : state.assignedTasks) {
-                if (!state.activeTasks.contains(id))
-                    taskIds.add(id);
-            }
-
-            final int numConsumers = consumers.size();
-            List<TaskId> active = new ArrayList<>();
-            Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
-
-            int i = 0;
-            for (String consumer : consumers) {
-                List<TopicPartition> activePartitions = new ArrayList<>();
-
-                final int numTaskIds = taskIds.size();
-                for (int j = i; j < numTaskIds; j += numConsumers) {
-                    TaskId taskId = taskIds.get(j);
-                    if (j < numActiveTasks) {
-                        for (TopicPartition partition : partitionsForTask.get(taskId)) {
-                            activePartitions.add(partition);
-                            active.add(taskId);
-                        }
-                    } else {
-                        Set<TopicPartition> standbyPartitions = standby.get(taskId);
-                        if (standbyPartitions == null) {
-                            standbyPartitions = new HashSet<>();
-                            standby.put(taskId, standbyPartitions);
-                        }
-                        standbyPartitions.addAll(partitionsForTask.get(taskId));
-                    }
-                }
-
-                AssignmentInfo data = new AssignmentInfo(active, standby);
-                assignment.put(consumer, new Assignment(activePartitions, data.encode()));
-                i++;
-
-                active.clear();
-                standby.clear();
-            }
-        }
-
-        // if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions
-        if (zkClient != null) {
-            log.debug("Starting to validate changelog topics in partition assignor.");
-
-            Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
-            topicToTaskIds.putAll(stateChangelogTopicToTaskIds);
-            topicToTaskIds.putAll(internalSourceTopicToTaskIds);
-
-            for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
-                String topic = streamThread.jobId + "-" + entry.getKey();
-
-                // the expected number of partitions is the max value of TaskId.partition + 1
-                int numPartitions = 0;
-                for (TaskId task : entry.getValue()) {
-                    if (numPartitions < task.partition + 1)
-                        numPartitions = task.partition + 1;
-                }
-
-                boolean topicNotReady = true;
-
-                while (topicNotReady) {
-                    Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
-
-                    // if topic does not exist, create it
-                    if (topicMetadata == null) {
-                        try {
-                            createTopic(topic, numPartitions);
-                        } catch (ZkNodeExistsException e) {
-                            // ignore and continue
-                        }
-                    } else {
-                        if (topicMetadata.size() > numPartitions) {
-                            // else if topic exists with more #.partitions than needed, delete in order to re-create it
-                            try {
-                                deleteTopic(topic);
-                            } catch (ZkNodeExistsException e) {
-                                // ignore and continue
-                            }
-                        } else if (topicMetadata.size() < numPartitions) {
-                            // else if topic exists with less #.partitions than needed, add partitions
-                            try {
-                                addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata);
-                            } catch (ZkNoNodeException e) {
-                                // ignore and continue
-                            }
-                        }
-
-                        topicNotReady = false;
-                    }
-                }
-
-                // wait until the topic metadata has been propagated to all brokers
-                List<PartitionInfo> partitions;
-                do {
-                    partitions = streamThread.restoreConsumer.partitionsFor(topic);
-                } while (partitions == null || partitions.size() != numPartitions);
-            }
-
-            log.info("Completed validating changelog topics in partition assignor.");
-        }
-
-        return assignment;
-    }
-
-    @Override
-    public void onAssignment(Assignment assignment) {
-        List<TopicPartition> partitions = assignment.partitions();
-
-        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
-        this.standbyTasks = info.standbyTasks;
-
-        Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
-        Iterator<TaskId> iter = info.activeTasks.iterator();
-        for (TopicPartition partition : partitions) {
-            Set<TaskId> taskIds = partitionToTaskIds.get(partition);
-            if (taskIds == null) {
-                taskIds = new HashSet<>();
-                partitionToTaskIds.put(partition, taskIds);
-            }
-
-            if (iter.hasNext()) {
-                taskIds.add(iter.next());
-            } else {
-                TaskAssignmentException ex = new TaskAssignmentException(
-                        "failed to find a task id for the partition=" + partition.toString() +
-                        ", partitions=" + partitions.size() + ", assignmentInfo=" + info.toString()
-                );
-                log.error(ex.getMessage(), ex);
-                throw ex;
-            }
-        }
-        this.partitionToTaskIds = partitionToTaskIds;
-    }
-
-    /* For Test Only */
-    public Set<TaskId> tasksForState(String stateName) {
-        return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
-    }
-
-    public Set<TaskId> tasksForPartition(TopicPartition partition) {
-        return partitionToTaskIds.get(partition);
-    }
-
-    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
-        return standbyTasks;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 3429df3..102c534 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -20,8 +20,8 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -38,7 +38,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
 
     private final TaskId id;
     private final StreamTask task;
-    private final StreamingMetrics metrics;
+    private final StreamsMetrics metrics;
     private final RecordCollector collector;
     private final ProcessorStateManager stateMgr;
 
@@ -52,10 +52,10 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     @SuppressWarnings("unchecked")
     public ProcessorContextImpl(TaskId id,
                                 StreamTask task,
-                                StreamingConfig config,
+                                StreamsConfig config,
                                 RecordCollector collector,
                                 ProcessorStateManager stateMgr,
-                                StreamingMetrics metrics) {
+                                StreamsMetrics metrics) {
         this.id = id;
         this.task = task;
         this.metrics = metrics;
@@ -113,7 +113,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public StreamingMetrics metrics() {
+    public StreamsMetrics metrics() {
         return metrics;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index fe0472e..25c663d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +72,7 @@ public class RecordCollector {
     }
 
     public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
-                            StreamPartitioner<K, V> partitioner) {
+                            StreamsPartitioner<K, V> partitioner) {
         byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
         byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
         Integer partition = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 7ab59ee..88b3f56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -20,18 +20,18 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
 
 public class SinkNode<K, V> extends ProcessorNode<K, V> {
 
     private final String topic;
     private Serializer<K> keySerializer;
     private Serializer<V> valSerializer;
-    private final StreamPartitioner<K, V> partitioner;
+    private final StreamsPartitioner<K, V> partitioner;
 
     private ProcessorContext context;
 
-    public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner) {
+    public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner) {
         super(name);
 
         this.topic = topic;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index e0583e3..89d185c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -20,8 +20,8 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -36,7 +36,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class);
 
     private final TaskId id;
-    private final StreamingMetrics metrics;
+    private final StreamsMetrics metrics;
     private final ProcessorStateManager stateMgr;
 
     private final Serializer<?> keySerializer;
@@ -47,9 +47,9 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     private boolean initialized;
 
     public StandbyContextImpl(TaskId id,
-                              StreamingConfig config,
+                              StreamsConfig config,
                               ProcessorStateManager stateMgr,
-                              StreamingMetrics metrics) {
+                              StreamsMetrics metrics) {
         this.id = id;
         this.metrics = metrics;
         this.stateMgr = stateMgr;
@@ -105,7 +105,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     }
 
     @Override
-    public StreamingMetrics metrics() {
+    public StreamsMetrics metrics() {
         return metrics;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 4cc4ea4..861b830 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -21,8 +21,8 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,8 +50,8 @@ public class StandbyTask extends AbstractTask {
      * @param topology              the instance of {@link ProcessorTopology}
      * @param consumer              the instance of {@link Consumer}
      * @param restoreConsumer       the instance of {@link Consumer} used when restoring state
-     * @param config                the {@link StreamingConfig} specified by the user
-     * @param metrics               the {@link StreamingMetrics} created by the thread
+     * @param config                the {@link StreamsConfig} specified by the user
+     * @param metrics               the {@link StreamsMetrics} created by the thread
      */
     public StandbyTask(TaskId id,
                        String jobId,
@@ -59,8 +59,8 @@ public class StandbyTask extends AbstractTask {
                        ProcessorTopology topology,
                        Consumer<byte[], byte[]> consumer,
                        Consumer<byte[], byte[]> restoreConsumer,
-                       StreamingConfig config,
-                       StreamingMetrics metrics) {
+                       StreamsConfig config,
+                       StreamsMetrics metrics) {
         super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true);
 
         // initialize the topology with its own context

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
new file mode 100644
index 0000000..5d87e5a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.ClientState;
+import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignmentException;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.zookeeper.ZooDefs;
+
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.I0Itec.zkclient.ZkClient;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
+
+    private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
+
+    private StreamThread streamThread;
+
+    private int numStandbyReplicas;
+    private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
+    private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
+    private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds;
+    private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
+    private Map<TaskId, Set<TopicPartition>> standbyTasks;
+
+    // TODO: the following ZK dependency should be removed after KIP-4
+    private static final String ZK_TOPIC_PATH = "/brokers/topics";
+    private static final String ZK_BROKER_PATH = "/brokers/ids";
+    private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
+
+    private ZkClient zkClient;
+
+    private class ZKStringSerializer implements ZkSerializer {
+
+        @Override
+        public byte[] serialize(Object data) {
+            try {
+                return ((String) data).getBytes("UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                throw new AssertionError(e);
+            }
+        }
+
+        @Override
+        public Object deserialize(byte[] bytes) {
+            try {
+                if (bytes == null)
+                    return null;
+                else
+                    return new String(bytes, "UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                throw new AssertionError(e);
+            }
+        }
+    }
+
+    private List<Integer> getBrokers() {
+        List<Integer> brokers = new ArrayList<>();
+        for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
+            brokers.add(Integer.parseInt(broker));
+        }
+        Collections.sort(brokers);
+
+        log.debug("Read brokers {} from ZK in partition assignor.", brokers);
+
+        return brokers;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
+        String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
+
+        if (data == null) return null;
+
+        try {
+            ObjectMapper mapper = new ObjectMapper();
+
+            Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
+
+            });
+
+            Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
+
+            log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
+
+            return partitions;
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException {
+        log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
+
+        // we always assign leaders to brokers starting at the first one with replication factor 1
+        List<Integer> brokers = getBrokers();
+
+        Map<Integer, List<Integer>> assignment = new HashMap<>();
+        for (int i = 0; i < numPartitions; i++) {
+            assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size())));
+        }
+
+        // try to write to ZK with open ACL
+        try {
+            Map<String, Object> dataMap = new HashMap<>();
+            dataMap.put("version", 1);
+            dataMap.put("partitions", assignment);
+
+            ObjectMapper mapper = new ObjectMapper();
+            String data = mapper.writeValueAsString(dataMap);
+
+            zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+        } catch (JsonProcessingException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    private void deleteTopic(String topic) throws ZkNodeExistsException {
+        log.debug("Deleting topic {} from ZK in partition assignor.", topic);
+
+        zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
+    }
+
+    private void addPartitions(String topic, int numPartitions, Map<Integer, List<Integer>> existingAssignment) {
+        log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
+
+        // we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1
+        List<Integer> brokers = getBrokers();
+
+        int startIndex = existingAssignment.size();
+
+        Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
+
+        for (int i = 0; i < numPartitions; i++) {
+            newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size()));
+        }
+
+        // try to write to ZK with open ACL
+        try {
+            Map<String, Object> dataMap = new HashMap<>();
+            dataMap.put("version", 1);
+            dataMap.put("partitions", newAssignment);
+
+            ObjectMapper mapper = new ObjectMapper();
+            String data = mapper.writeValueAsString(dataMap);
+
+            zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
+        } catch (JsonProcessingException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
+     * since the former needs later's cached metadata while sending subscriptions,
+     * and the latter needs former's returned assignment when adding tasks.
+     */
+    @Override
+    public void configure(Map<String, ?> configs) {
+        numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+
+        Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
+        if (o == null) {
+            KafkaException ex = new KafkaException("StreamThread is not specified");
+            log.error(ex.getMessage(), ex);
+            throw ex;
+        }
+
+        if (!(o instanceof StreamThread)) {
+            KafkaException ex = new KafkaException(o.getClass().getName() + " is not an instance of " + StreamThread.class.getName());
+            log.error(ex.getMessage(), ex);
+            throw ex;
+        }
+
+        streamThread = (StreamThread) o;
+        streamThread.partitionAssignor(this);
+
+        this.topicGroups = streamThread.builder.topicGroups();
+
+        if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG))
+            zkClient = new ZkClient((String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer());
+    }
+
+    @Override
+    public String name() {
+        return "stream";
+    }
+
+    @Override
+    public Subscription subscription(Set<String> topics) {
+        // Adds the following information to subscription
+        // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
+        // 2. Task ids of previously running tasks
+        // 3. Task ids of valid local states on the client's state directory.
+
+        Set<TaskId> prevTasks = streamThread.prevTasks();
+        Set<TaskId> standbyTasks = streamThread.cachedTasks();
+        standbyTasks.removeAll(prevTasks);
+        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
+
+        return new Subscription(new ArrayList<>(topics), data.encode());
+    }
+
+    @Override
+    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
+        // This assigns tasks to consumer clients in two steps.
+        // 1. using TaskAssignor to assign tasks to consumer clients.
+        //    - Assign a task to a client which was running it previously.
+        //      If there is no such client, assign a task to a client which has its valid local state.
+        //    - A client may have more than one stream threads.
+        //      The assignor tries to assign tasks to a client proportionally to the number of threads.
+        //    - We try not to assign the same set of tasks to two different clients
+        //    We do the assignment in one-pass. The result may not satisfy above all.
+        // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
+        Map<UUID, Set<String>> consumersByClient = new HashMap<>();
+        Map<UUID, ClientState<TaskId>> states = new HashMap<>();
+
+        // Decode subscription info
+        for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
+            String consumerId = entry.getKey();
+            Subscription subscription = entry.getValue();
+
+            SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+
+            Set<String> consumers = consumersByClient.get(info.processId);
+            if (consumers == null) {
+                consumers = new HashSet<>();
+                consumersByClient.put(info.processId, consumers);
+            }
+            consumers.add(consumerId);
+
+            ClientState<TaskId> state = states.get(info.processId);
+            if (state == null) {
+                state = new ClientState<>();
+                states.put(info.processId, state);
+            }
+
+            state.prevActiveTasks.addAll(info.prevTasks);
+            state.prevAssignedTasks.addAll(info.prevTasks);
+            state.prevAssignedTasks.addAll(info.standbyTasks);
+            state.capacity = state.capacity + 1d;
+        }
+
+        // get the tasks as partition groups from the partition grouper
+        Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
+        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+            sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
+        }
+        Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata);
+
+        // add tasks to state topic subscribers
+        stateChangelogTopicToTaskIds = new HashMap<>();
+        internalSourceTopicToTaskIds = new HashMap<>();
+        for (TaskId task : partitionsForTask.keySet()) {
+            for (String stateName : topicGroups.get(task.topicGroupId).stateChangelogTopics) {
+                Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(stateName);
+                if (tasks == null) {
+                    tasks = new HashSet<>();
+                    stateChangelogTopicToTaskIds.put(stateName, tasks);
+                }
+
+                tasks.add(task);
+            }
+
+            for (String topicName : topicGroups.get(task.topicGroupId).interSourceTopics) {
+                Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topicName);
+                if (tasks == null) {
+                    tasks = new HashSet<>();
+                    internalSourceTopicToTaskIds.put(topicName, tasks);
+                }
+
+                tasks.add(task);
+            }
+        }
+
+        // assign tasks to clients
+        states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
+        Map<String, Assignment> assignment = new HashMap<>();
+
+        for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
+            UUID processId = entry.getKey();
+            Set<String> consumers = entry.getValue();
+            ClientState<TaskId> state = states.get(processId);
+
+            ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
+            final int numActiveTasks = state.activeTasks.size();
+            for (TaskId taskId : state.activeTasks) {
+                taskIds.add(taskId);
+            }
+            for (TaskId id : state.assignedTasks) {
+                if (!state.activeTasks.contains(id))
+                    taskIds.add(id);
+            }
+
+            final int numConsumers = consumers.size();
+            List<TaskId> active = new ArrayList<>();
+            Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
+
+            int i = 0;
+            for (String consumer : consumers) {
+                List<TopicPartition> activePartitions = new ArrayList<>();
+
+                final int numTaskIds = taskIds.size();
+                for (int j = i; j < numTaskIds; j += numConsumers) {
+                    TaskId taskId = taskIds.get(j);
+                    if (j < numActiveTasks) {
+                        for (TopicPartition partition : partitionsForTask.get(taskId)) {
+                            activePartitions.add(partition);
+                            active.add(taskId);
+                        }
+                    } else {
+                        Set<TopicPartition> standbyPartitions = standby.get(taskId);
+                        if (standbyPartitions == null) {
+                            standbyPartitions = new HashSet<>();
+                            standby.put(taskId, standbyPartitions);
+                        }
+                        standbyPartitions.addAll(partitionsForTask.get(taskId));
+                    }
+                }
+
+                AssignmentInfo data = new AssignmentInfo(active, standby);
+                assignment.put(consumer, new Assignment(activePartitions, data.encode()));
+                i++;
+
+                active.clear();
+                standby.clear();
+            }
+        }
+
+        // if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions
+        if (zkClient != null) {
+            log.debug("Starting to validate changelog topics in partition assignor.");
+
+            Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
+            topicToTaskIds.putAll(stateChangelogTopicToTaskIds);
+            topicToTaskIds.putAll(internalSourceTopicToTaskIds);
+
+            for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
+                String topic = streamThread.jobId + "-" + entry.getKey();
+
+                // the expected number of partitions is the max value of TaskId.partition + 1
+                int numPartitions = 0;
+                for (TaskId task : entry.getValue()) {
+                    if (numPartitions < task.partition + 1)
+                        numPartitions = task.partition + 1;
+                }
+
+                boolean topicNotReady = true;
+
+                while (topicNotReady) {
+                    Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
+
+                    // if topic does not exist, create it
+                    if (topicMetadata == null) {
+                        try {
+                            createTopic(topic, numPartitions);
+                        } catch (ZkNodeExistsException e) {
+                            // ignore and continue
+                        }
+                    } else {
+                        if (topicMetadata.size() > numPartitions) {
+                            // else if topic exists with more #.partitions than needed, delete in order to re-create it
+                            try {
+                                deleteTopic(topic);
+                            } catch (ZkNodeExistsException e) {
+                                // ignore and continue
+                            }
+                        } else if (topicMetadata.size() < numPartitions) {
+                            // else if topic exists with less #.partitions than needed, add partitions
+                            try {
+                                addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata);
+                            } catch (ZkNoNodeException e) {
+                                // ignore and continue
+                            }
+                        }
+
+                        topicNotReady = false;
+                    }
+                }
+
+                // wait until the topic metadata has been propagated to all brokers
+                List<PartitionInfo> partitions;
+                do {
+                    partitions = streamThread.restoreConsumer.partitionsFor(topic);
+                } while (partitions == null || partitions.size() != numPartitions);
+            }
+
+            log.info("Completed validating changelog topics in partition assignor.");
+        }
+
+        return assignment;
+    }
+
+    @Override
+    public void onAssignment(Assignment assignment) {
+        List<TopicPartition> partitions = assignment.partitions();
+
+        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+        this.standbyTasks = info.standbyTasks;
+
+        Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
+        Iterator<TaskId> iter = info.activeTasks.iterator();
+        for (TopicPartition partition : partitions) {
+            Set<TaskId> taskIds = partitionToTaskIds.get(partition);
+            if (taskIds == null) {
+                taskIds = new HashSet<>();
+                partitionToTaskIds.put(partition, taskIds);
+            }
+
+            if (iter.hasNext()) {
+                taskIds.add(iter.next());
+            } else {
+                TaskAssignmentException ex = new TaskAssignmentException(
+                        "failed to find a task id for the partition=" + partition.toString() +
+                        ", partitions=" + partitions.size() + ", assignmentInfo=" + info.toString()
+                );
+                log.error(ex.getMessage(), ex);
+                throw ex;
+            }
+        }
+        this.partitionToTaskIds = partitionToTaskIds;
+    }
+
+    /* For Test Only */
+    public Set<TaskId> tasksForState(String stateName) {
+        return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
+    }
+
+    public Set<TaskId> tasksForPartition(TopicPartition partition) {
+        return partitionToTaskIds.get(partition);
+    }
+
+    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
+        return standbyTasks;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 2e58ad5..ec3d011 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -22,8 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.slf4j.Logger;
@@ -67,8 +67,8 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * @param consumer              the instance of {@link Consumer}
      * @param producer              the instance of {@link Producer}
      * @param restoreConsumer       the instance of {@link Consumer} used when restoring state
-     * @param config                the {@link StreamingConfig} specified by the user
-     * @param metrics               the {@link StreamingMetrics} created by the thread
+     * @param config                the {@link StreamsConfig} specified by the user
+     * @param metrics               the {@link StreamsMetrics} created by the thread
      */
     public StreamTask(TaskId id,
                       String jobId,
@@ -77,11 +77,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
                       Consumer<byte[], byte[]> consumer,
                       Producer<byte[], byte[]> producer,
                       Consumer<byte[], byte[]> restoreConsumer,
-                      StreamingConfig config,
-                      StreamingMetrics metrics) {
+                      StreamsConfig config,
+                      StreamsMetrics metrics) {
         super(id, jobId, partitions, topology, consumer, restoreConsumer, config, false);
         this.punctuationQueue = new PunctuationQueue();
-        this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+        this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 
         // create queues for each assigned partition and associate them
         // to corresponding source nodes in the processor topology
@@ -93,7 +93,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
             partitionQueues.put(partition, queue);
         }
 
-        TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
+        TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
         this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
 
         // initialize the consumed offset cache