You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/01/22 22:00:12 UTC

[4/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to KafkaStreams

KAFKA-3136: Rename KafkaStreaming to KafkaStreams

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Gwen Shapira

Closes #800 from guozhangwang/KRename


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/21c6cfe5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/21c6cfe5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/21c6cfe5

Branch: refs/heads/trunk
Commit: 21c6cfe50dbe818a392c28f48ce8891f7f99aaf6
Parents: 91ba074
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Jan 22 13:00:00 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Jan 22 13:00:00 2016 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/KafkaStreaming.java    | 167 ------
 .../org/apache/kafka/streams/KafkaStreams.java  | 171 +++++++
 .../java/org/apache/kafka/streams/KeyValue.java |  37 ++
 .../apache/kafka/streams/StreamingConfig.java   | 314 ------------
 .../apache/kafka/streams/StreamingMetrics.java  |  27 -
 .../org/apache/kafka/streams/StreamsConfig.java | 303 +++++++++++
 .../apache/kafka/streams/StreamsMetrics.java    |  27 +
 .../kafka/streams/examples/KStreamJob.java      |  24 +-
 .../kafka/streams/examples/ProcessorJob.java    |  34 +-
 .../apache/kafka/streams/kstream/KStream.java   |   1 +
 .../apache/kafka/streams/kstream/KTable.java    |   1 +
 .../apache/kafka/streams/kstream/KeyValue.java  |  34 --
 .../kstream/internals/KStreamAggregate.java     |   2 +-
 .../kstream/internals/KStreamFlatMap.java       |   2 +-
 .../streams/kstream/internals/KStreamImpl.java  |  10 +-
 .../kstream/internals/KStreamKStreamJoin.java   |   2 +-
 .../streams/kstream/internals/KStreamMap.java   |   2 +-
 .../kstream/internals/KStreamReduce.java        |   2 +-
 .../kstream/internals/KStreamTransform.java     |   2 +-
 .../streams/kstream/internals/KTableImpl.java   |   2 +-
 .../kstream/internals/KTableRepartitionMap.java |   2 +-
 .../internals/WindowedStreamPartitioner.java    |  52 --
 .../internals/WindowedStreamsPartitioner.java   |  52 ++
 .../streams/processor/ProcessorContext.java     |   6 +-
 .../streams/processor/StreamPartitioner.java    |  59 ---
 .../streams/processor/StreamsPartitioner.java   |  59 +++
 .../streams/processor/TopologyBuilder.java      |  76 +--
 .../processor/internals/AbstractTask.java       |   6 +-
 .../KafkaStreamingPartitionAssignor.java        | 483 ------------------
 .../internals/ProcessorContextImpl.java         |  12 +-
 .../processor/internals/RecordCollector.java    |   4 +-
 .../streams/processor/internals/SinkNode.java   |   6 +-
 .../processor/internals/StandbyContextImpl.java |  12 +-
 .../processor/internals/StandbyTask.java        |  12 +-
 .../internals/StreamPartitionAssignor.java      | 483 ++++++++++++++++++
 .../streams/processor/internals/StreamTask.java |  16 +-
 .../processor/internals/StreamThread.java       |  48 +-
 .../org/apache/kafka/streams/state/Entry.java   |  42 --
 .../kafka/streams/state/KeyValueIterator.java   |   4 +-
 .../kafka/streams/state/KeyValueStore.java      |   3 +-
 .../streams/state/WindowStoreIterator.java      |   2 +-
 .../InMemoryKeyValueStoreSupplier.java          |  12 +-
 .../InMemoryLRUCacheStoreSupplier.java          |  12 +-
 .../state/internals/MeteredKeyValueStore.java   |  14 +-
 .../state/internals/MeteredWindowStore.java     |   6 +-
 .../streams/state/internals/RocksDBStore.java   |  16 +-
 .../state/internals/RocksDBWindowStore.java     |   9 +-
 .../kafka/streams/StreamingConfigTest.java      |  75 ---
 .../apache/kafka/streams/StreamsConfigTest.java |  76 +++
 .../kstream/internals/KStreamFlatMapTest.java   |   2 +-
 .../internals/KStreamKTableLeftJoinTest.java    |   2 +-
 .../kstream/internals/KStreamMapTest.java       |   2 +-
 .../kstream/internals/KStreamTransformTest.java |   2 +-
 .../kstream/internals/KTableKTableJoinTest.java |   2 +-
 .../internals/KTableKTableLeftJoinTest.java     |   2 +-
 .../internals/KTableKTableOuterJoinTest.java    |   2 +-
 .../WindowedStreamPartitionerTest.java          |  84 ---
 .../WindowedStreamsPartitionerTest.java         |  84 +++
 .../KafkaStreamingPartitionAssignorTest.java    | 508 ------------------
 .../internals/ProcessorTopologyTest.java        |  27 +-
 .../processor/internals/StandbyTaskTest.java    |  31 +-
 .../internals/StreamPartitionAssignorTest.java  | 509 +++++++++++++++++++
 .../processor/internals/StreamTaskTest.java     |  27 +-
 .../processor/internals/StreamThreadTest.java   |  37 +-
 .../streams/state/KeyValueStoreTestDriver.java  |  45 +-
 .../internals/AbstractKeyValueStoreTest.java    |  22 +-
 .../state/internals/RocksDBWindowStoreTest.java |  38 +-
 .../apache/kafka/test/KStreamTestDriver.java    |   4 +-
 .../apache/kafka/test/MockProcessorContext.java |  14 +-
 .../apache/kafka/test/NoOpKeyValueMapper.java   |   2 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |  32 +-
 71 files changed, 2133 insertions(+), 2168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
deleted file mode 100644
index 0d99739..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Kafka Streaming allows for performing continuous computation on input coming from one or more input topics and
- * sends output to zero or more output topics.
- * <p>
- * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify
- * the transformation.
- * The {@link KafkaStreaming} instance will be responsible for the lifecycle of these processors. It will instantiate and
- * start one or more of these processors to process the Kafka partitions assigned to this particular instance.
- * <p>
- * This streaming instance will co-ordinate with any other instances (whether in this same process, on other processes
- * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being
- * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or
- * started in the appropriate processes to balance processing load.
- * <p>
- * Internally the {@link KafkaStreaming} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
- * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
- * <p>
- * A simple example might look like this:
- * <pre>
- *    Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
- *    props.put("bootstrap.servers", "localhost:4242");
- *    props.put("key.deserializer", StringDeserializer.class);
- *    props.put("value.deserializer", StringDeserializer.class);
- *    props.put("key.serializer", StringSerializer.class);
- *    props.put("value.serializer", IntegerSerializer.class);
- *    props.put("timestamp.extractor", MyTimestampExtractor.class);
- *    StreamingConfig config = new StreamingConfig(props);
- *
- *    KStreamBuilder builder = new KStreamBuilder();
- *    builder.from("topic1").mapValue(value -&gt; value.length()).to("topic2");
- *
- *    KafkaStreaming streaming = new KafkaStreaming(builder, config);
- *    streaming.start();
- * </pre>
- *
- */
-public class KafkaStreaming {
-
-    private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class);
-    private static final AtomicInteger STREAMING_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
-    private static final String JMX_PREFIX = "kafka.streams";
-
-    // container states
-    private static final int CREATED = 0;
-    private static final int RUNNING = 1;
-    private static final int STOPPED = 2;
-    private int state = CREATED;
-
-    private final StreamThread[] threads;
-
-    // processId is expected to be unique across JVMs and to be used
-    // in userData of the subscription request to allow assignor be aware
-    // of the co-location of stream thread's consumers. It is for internal
-    // usage only and should not be exposed to users at all.
-    private final UUID processId;
-
-    public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception {
-        // create the metrics
-        Time time = new SystemTime();
-
-        this.processId = UUID.randomUUID();
-
-        String jobId = config.getString(StreamingConfig.JOB_ID_CONFIG);
-        if (jobId.length() <= 0)
-            jobId = "kafka-streams";
-
-        String clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
-        if (clientId.length() <= 0)
-            clientId = jobId + "-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
-
-        List<MetricsReporter> reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                MetricsReporter.class);
-        reporters.add(new JmxReporter(JMX_PREFIX));
-
-        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG))
-            .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
-                TimeUnit.MILLISECONDS);
-
-        Metrics metrics = new Metrics(metricConfig, reporters, time);
-
-        this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)];
-        for (int i = 0; i < this.threads.length; i++) {
-            this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
-        }
-    }
-
-    /**
-     * Start the stream process by starting all its threads
-     */
-    public synchronized void start() {
-        log.debug("Starting Kafka Stream process");
-
-        if (state == CREATED) {
-            for (StreamThread thread : threads)
-                thread.start();
-
-            state = RUNNING;
-
-            log.info("Started Kafka Stream process");
-        } else {
-            throw new IllegalStateException("This process was already started.");
-        }
-    }
-
-    /**
-     * Shutdown this stream process by signaling the threads to stop,
-     * wait for them to join and clean up the process instance.
-     */
-    public synchronized void close() {
-        log.debug("Stopping Kafka Stream process");
-
-        if (state == RUNNING) {
-            // signal the threads to stop and wait
-            for (StreamThread thread : threads)
-                thread.close();
-
-            for (StreamThread thread : threads) {
-                try {
-                    thread.join();
-                } catch (InterruptedException ex) {
-                    Thread.interrupted();
-                }
-            }
-
-            state = STOPPED;
-
-            log.info("Stopped Kafka Stream process");
-        } else {
-            throw new IllegalStateException("This process has not started yet.");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
new file mode 100644
index 0000000..071cef6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
+ * sends output to zero or more output topics.
+ * <p>
+ * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify
+ * the transformation.
+ * The {@link KafkaStreams} instance will be responsible for the lifecycle of these processors. It will instantiate and
+ * start one or more of these processors to process the Kafka partitions assigned to this particular instance.
+ * <p>
+ * This {@link KafkaStreams} instance will co-ordinate with any other instances (whether in this same process, on other processes
+ * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being
+ * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or
+ * started in the appropriate processes to balance processing load.
+ * <p>
+ * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
+ * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
+ * <p>
+ * A simple example might look like this:
+ * <pre>
+ *    Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
+ *    props.put("bootstrap.servers", "localhost:4242");
+ *    props.put("key.deserializer", StringDeserializer.class);
+ *    props.put("value.deserializer", StringDeserializer.class);
+ *    props.put("key.serializer", StringSerializer.class);
+ *    props.put("value.serializer", IntegerSerializer.class);
+ *    props.put("timestamp.extractor", MyTimestampExtractor.class);
+ *    StreamsConfig config = new StreamsConfig(props);
+ *
+ *    KStreamBuilder builder = new KStreamBuilder();
+ *    builder.from("topic1").mapValue(value -&gt; value.length()).to("topic2");
+ *
+ *    KafkaStreams streams = new KafkaStreams(builder, config);
+ *    streams.start();
+ * </pre>
+ *
+ */
+public class KafkaStreams {
+
+    private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
+    private static final AtomicInteger STREAM_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+    private static final String JMX_PREFIX = "kafka.streams";
+
+    // container states
+    private static final int CREATED = 0;
+    private static final int RUNNING = 1;
+    private static final int STOPPED = 2;
+    private int state = CREATED;
+
+    private final StreamThread[] threads;
+
+    // processId is expected to be unique across JVMs and to be used
+    // in userData of the subscription request to allow assignor be aware
+    // of the co-location of stream thread's consumers. It is for internal
+    // usage only and should not be exposed to users at all.
+    private final UUID processId;
+
+    public KafkaStreams(TopologyBuilder builder, Properties props) {
+        this(builder, new StreamsConfig(props));
+    }
+
+    public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
+        // create the metrics
+        Time time = new SystemTime();
+
+        this.processId = UUID.randomUUID();
+
+        // JobId is a required config and hence should always have value
+        String jobId = config.getString(StreamsConfig.JOB_ID_CONFIG);
+
+        String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
+        if (clientId.length() <= 0)
+            clientId = jobId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
+
+        List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
+                MetricsReporter.class);
+        reporters.add(new JmxReporter(JMX_PREFIX));
+
+        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+            .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+                TimeUnit.MILLISECONDS);
+
+        Metrics metrics = new Metrics(metricConfig, reporters, time);
+
+        this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        for (int i = 0; i < this.threads.length; i++) {
+            this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
+        }
+    }
+
+    /**
+     * Start the stream process by starting all its threads
+     */
+    public synchronized void start() {
+        log.debug("Starting Kafka Stream process");
+
+        if (state == CREATED) {
+            for (StreamThread thread : threads)
+                thread.start();
+
+            state = RUNNING;
+
+            log.info("Started Kafka Stream process");
+        } else {
+            throw new IllegalStateException("This process was already started.");
+        }
+    }
+
+    /**
+     * Shutdown this stream process by signaling the threads to stop,
+     * wait for them to join and clean up the process instance.
+     */
+    public synchronized void close() {
+        log.debug("Stopping Kafka Stream process");
+
+        if (state == RUNNING) {
+            // signal the threads to stop and wait
+            for (StreamThread thread : threads)
+                thread.close();
+
+            for (StreamThread thread : threads) {
+                try {
+                    thread.join();
+                } catch (InterruptedException ex) {
+                    Thread.interrupted();
+                }
+            }
+
+            state = STOPPED;
+
+            log.info("Stopped Kafka Stream process");
+        } else {
+            throw new IllegalStateException("This process has not started yet.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
new file mode 100644
index 0000000..472e677
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+public class KeyValue<K, V> {
+
+    public final K key;
+    public final V value;
+
+    public KeyValue(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public static <K, V> KeyValue<K, V> pair(K key, V value) {
+        return new KeyValue<>(key, value);
+    }
+
+    public String toString() {
+        return "KeyValue(" + key + ", " + value + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
deleted file mode 100644
index e89d030..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
-import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-
-import java.util.Map;
-
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-
-public class StreamingConfig extends AbstractConfig {
-
-    private static final ConfigDef CONFIG;
-
-    /** <code>state.dir</code> */
-    public static final String STATE_DIR_CONFIG = "state.dir";
-    private static final String STATE_DIR_DOC = "Directory location for state store.";
-
-    /** <code>zookeeper.connect<code/> */
-    public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
-    private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
-
-    /** <code>commit.interval.ms</code> */
-    public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
-    private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
-
-    /** <code>poll.ms</code> */
-    public static final String POLL_MS_CONFIG = "poll.ms";
-    private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
-
-    /** <code>num.stream.threads</code> */
-    public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
-    private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
-
-    /** <code>num.stream.threads</code> */
-    public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
-    private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
-
-    /** <code>buffered.records.per.partition</code> */
-    public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
-    private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
-
-    /** <code>state.cleanup.delay</code> */
-    public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
-    private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated.";
-
-    /** <code>total.records.to.process</code> */
-    public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process";
-    private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records.";
-
-    /** <code>window.time.ms</code> */
-    public static final String WINDOW_TIME_MS_CONFIG = "window.time.ms";
-    private static final String WINDOW_TIME_MS_DOC = "Setting this to a non-negative value will cause the processor to get called "
-                                                     + "with this frequency even if there is no message.";
-
-    /** <code>timestamp.extractor</code> */
-    public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
-    private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
-
-    /** <code>partition.grouper</code> */
-    public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
-    private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
-
-    /** <code>job.id</code> */
-    public static final String JOB_ID_CONFIG = "job.id";
-    public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
-
-    /** <code>key.serializer</code> */
-    public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
-
-    /** <code>value.serializer</code> */
-    public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-
-    /** <code>key.deserializer</code> */
-    public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-
-    /** <code>value.deserializer</code> */
-    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-
-    /** <code>metrics.sample.window.ms</code> */
-    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
-
-    /** <code>metrics.num.samples</code> */
-    public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
-
-    /** <code>metric.reporters</code> */
-    public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
-
-    /** <code>bootstrap.servers</code> */
-    public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
-
-    /** <code>client.id</code> */
-    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
-
-    private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
-
-    static {
-        CONFIG = new ConfigDef().define(JOB_ID_CONFIG,
-                                        Type.STRING,
-                                        "",
-                                        Importance.MEDIUM,
-                                        StreamingConfig.JOB_ID_DOC)
-                                .define(CLIENT_ID_CONFIG,
-                                        Type.STRING,
-                                        "",
-                                        Importance.MEDIUM,
-                                        CommonClientConfigs.CLIENT_ID_DOC)
-                                .define(ZOOKEEPER_CONNECT_CONFIG,
-                                        Type.STRING,
-                                        "",
-                                        Importance.HIGH,
-                                        StreamingConfig.ZOOKEEPER_CONNECT_DOC)
-                                .define(STATE_DIR_CONFIG,
-                                        Type.STRING,
-                                        SYSTEM_TEMP_DIRECTORY,
-                                        Importance.MEDIUM,
-                                        STATE_DIR_DOC)
-                                .define(COMMIT_INTERVAL_MS_CONFIG,
-                                        Type.LONG,
-                                        30000,
-                                        Importance.HIGH,
-                                        COMMIT_INTERVAL_MS_DOC)
-                                .define(POLL_MS_CONFIG,
-                                        Type.LONG,
-                                        100,
-                                        Importance.LOW,
-                                        POLL_MS_DOC)
-                                .define(NUM_STREAM_THREADS_CONFIG,
-                                        Type.INT,
-                                        1,
-                                        Importance.LOW,
-                                        NUM_STREAM_THREADS_DOC)
-                                .define(NUM_STANDBY_REPLICAS_CONFIG,
-                                        Type.INT,
-                                        0,
-                                        Importance.LOW,
-                                        NUM_STANDBY_REPLICAS_DOC)
-                                .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
-                                        Type.INT,
-                                        1000,
-                                        Importance.LOW,
-                                        BUFFERED_RECORDS_PER_PARTITION_DOC)
-                                .define(STATE_CLEANUP_DELAY_MS_CONFIG,
-                                        Type.LONG,
-                                        60000,
-                                        Importance.LOW,
-                                        STATE_CLEANUP_DELAY_MS_DOC)
-                                .define(TOTAL_RECORDS_TO_PROCESS,
-                                        Type.LONG,
-                                        -1L,
-                                        Importance.LOW,
-                                        TOTAL_RECORDS_TO_DOC)
-                                .define(WINDOW_TIME_MS_CONFIG,
-                                        Type.LONG,
-                                        -1L,
-                                        Importance.MEDIUM,
-                                        WINDOW_TIME_MS_DOC)
-                                .define(KEY_SERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ProducerConfig.KEY_SERIALIZER_CLASS_DOC)
-                                .define(VALUE_SERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ProducerConfig.VALUE_SERIALIZER_CLASS_DOC)
-                                .define(KEY_DESERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC)
-                                .define(VALUE_DESERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
-                                .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        TIMESTAMP_EXTRACTOR_CLASS_DOC)
-                                .define(PARTITION_GROUPER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        DefaultPartitionGrouper.class,
-                                        Importance.HIGH,
-                                        PARTITION_GROUPER_CLASS_DOC)
-                                .define(BOOTSTRAP_SERVERS_CONFIG,
-                                        Type.STRING,
-                                        Importance.HIGH,
-                                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
-                                .define(METRIC_REPORTER_CLASSES_CONFIG,
-                                        Type.LIST,
-                                        "",
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
-                                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
-                                        Type.LONG,
-                                        30000,
-                                        atLeast(0),
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
-                                .define(METRICS_NUM_SAMPLES_CONFIG,
-                                        Type.INT,
-                                        2,
-                                        atLeast(1),
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
-    }
-
-    public static class InternalConfig {
-        public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
-    }
-
-    public StreamingConfig(Map<?, ?> props) {
-        super(CONFIG, props);
-    }
-
-    public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) {
-        Map<String, Object> props = getBaseConsumerConfigs();
-
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
-        props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
-        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
-
-        props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
-
-        return props;
-    }
-
-    public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
-        Map<String, Object> props = getBaseConsumerConfigs();
-
-        // no need to set group id for a restore consumer
-        props.remove(ConsumerConfig.GROUP_ID_CONFIG);
-
-        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
-
-        return props;
-    }
-
-    private Map<String, Object> getBaseConsumerConfigs() {
-        Map<String, Object> props = this.originals();
-
-        // set consumer default property values
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
-        // remove properties that are not required for consumers
-        props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG);
-        props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-        props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
-        props.remove(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
-
-        return props;
-    }
-
-    public Map<String, Object> getProducerConfigs(String clientId) {
-        Map<String, Object> props = this.originals();
-
-        // set producer default property values
-        props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
-
-        // remove properties that are not required for producers
-        props.remove(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-        props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-        props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
-
-        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
-
-        return props;
-    }
-
-    public Serializer keySerializer() {
-        return getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
-    }
-
-    public Serializer valueSerializer() {
-        return getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
-    }
-
-    public Deserializer keyDeserializer() {
-        return getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
-    }
-
-    public Deserializer valueDeserializer() {
-        return getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
-    }
-
-    public static void main(String[] args) {
-        System.out.println(CONFIG.toHtmlTable());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
deleted file mode 100644
index ebf80b3..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.metrics.Sensor;
-
-public interface StreamingMetrics {
-
-    Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
-
-    void recordLatency(Sensor sensor, long startNs, long endNs);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
new file mode 100644
index 0000000..3843b1d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
+public class StreamsConfig extends AbstractConfig {
+
+    private static final ConfigDef CONFIG;
+
+    /** <code>state.dir</code> */
+    public static final String STATE_DIR_CONFIG = "state.dir";
+    private static final String STATE_DIR_DOC = "Directory location for state store.";
+
+    /** <code>zookeeper.connect<code/> */
+    public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
+    private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
+
+    /** <code>commit.interval.ms</code> */
+    public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
+    private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
+
+    /** <code>poll.ms</code> */
+    public static final String POLL_MS_CONFIG = "poll.ms";
+    private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
+
+    /** <code>num.stream.threads</code> */
+    public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
+    private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
+
+    /** <code>num.stream.threads</code> */
+    public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
+    private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
+
+    /** <code>buffered.records.per.partition</code> */
+    public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
+    private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
+
+    /** <code>state.cleanup.delay</code> */
+    public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
+    private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated.";
+
+    /** <code>total.records.to.process</code> */
+    public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process";
+    private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records.";
+
+    /** <code>timestamp.extractor</code> */
+    public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
+    private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
+
+    /** <code>partition.grouper</code> */
+    public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
+    private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
+
+    /** <code>job.id</code> */
+    public static final String JOB_ID_CONFIG = "job.id";
+    public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
+
+    /** <code>key.serializer</code> */
+    public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+
+    /** <code>value.serializer</code> */
+    public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+    /** <code>key.deserializer</code> */
+    public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+
+    /** <code>value.deserializer</code> */
+    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+    /** <code>metrics.sample.window.ms</code> */
+    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+
+    /** <code>metrics.num.samples</code> */
+    public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
+
+    /** <code>metric.reporters</code> */
+    public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+
+    /** <code>bootstrap.servers</code> */
+    public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+
+    /** <code>client.id</code> */
+    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+
+    private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
+
+    static {
+        CONFIG = new ConfigDef().define(JOB_ID_CONFIG,      // required with no default value
+                                        Type.STRING,
+                                        Importance.HIGH,
+                                        StreamsConfig.JOB_ID_DOC)
+                                .define(BOOTSTRAP_SERVERS_CONFIG,       // required with no default value
+                                        Type.STRING,
+                                        Importance.HIGH,
+                                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+                                .define(CLIENT_ID_CONFIG,
+                                        Type.STRING,
+                                        "",
+                                        Importance.HIGH,
+                                        CommonClientConfigs.CLIENT_ID_DOC)
+                                .define(ZOOKEEPER_CONNECT_CONFIG,
+                                        Type.STRING,
+                                        "",
+                                        Importance.HIGH,
+                                        StreamsConfig.ZOOKEEPER_CONNECT_DOC)
+                                .define(STATE_DIR_CONFIG,
+                                        Type.STRING,
+                                        SYSTEM_TEMP_DIRECTORY,
+                                        Importance.HIGH,
+                                        STATE_DIR_DOC)
+                                .define(KEY_SERIALIZER_CLASS_CONFIG,        // required with no default value
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        ProducerConfig.KEY_SERIALIZER_CLASS_DOC)
+                                .define(VALUE_SERIALIZER_CLASS_CONFIG,      // required with no default value
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        ProducerConfig.VALUE_SERIALIZER_CLASS_DOC)
+                                .define(KEY_DESERIALIZER_CLASS_CONFIG,      // required with no default value
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC)
+                                .define(VALUE_DESERIALIZER_CLASS_CONFIG,    // required with no default value
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
+                                .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Importance.MEDIUM,
+                                        TIMESTAMP_EXTRACTOR_CLASS_DOC)
+                                .define(PARTITION_GROUPER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        DefaultPartitionGrouper.class,
+                                        Importance.MEDIUM,
+                                        PARTITION_GROUPER_CLASS_DOC)
+                                .define(COMMIT_INTERVAL_MS_CONFIG,
+                                        Type.LONG,
+                                        30000,
+                                        Importance.LOW,
+                                        COMMIT_INTERVAL_MS_DOC)
+                                .define(POLL_MS_CONFIG,
+                                        Type.LONG,
+                                        100,
+                                        Importance.LOW,
+                                        POLL_MS_DOC)
+                                .define(NUM_STREAM_THREADS_CONFIG,
+                                        Type.INT,
+                                        1,
+                                        Importance.LOW,
+                                        NUM_STREAM_THREADS_DOC)
+                                .define(NUM_STANDBY_REPLICAS_CONFIG,
+                                        Type.INT,
+                                        0,
+                                        Importance.LOW,
+                                        NUM_STANDBY_REPLICAS_DOC)
+                                .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+                                        Type.INT,
+                                        1000,
+                                        Importance.LOW,
+                                        BUFFERED_RECORDS_PER_PARTITION_DOC)
+                                .define(STATE_CLEANUP_DELAY_MS_CONFIG,
+                                        Type.LONG,
+                                        60000,
+                                        Importance.LOW,
+                                        STATE_CLEANUP_DELAY_MS_DOC)
+                                .define(TOTAL_RECORDS_TO_PROCESS,
+                                        Type.LONG,
+                                        -1L,
+                                        Importance.LOW,
+                                        TOTAL_RECORDS_TO_DOC)
+                                .define(METRIC_REPORTER_CLASSES_CONFIG,
+                                        Type.LIST,
+                                        "",
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                                        Type.LONG,
+                                        30000,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+                                .define(METRICS_NUM_SAMPLES_CONFIG,
+                                        Type.INT,
+                                        2,
+                                        atLeast(1),
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
+    }
+
+    public static class InternalConfig {
+        public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
+    }
+
+    public StreamsConfig(Map<?, ?> props) {
+        super(CONFIG, props);
+    }
+
+    public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) {
+        Map<String, Object> props = getBaseConsumerConfigs();
+
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
+        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
+        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
+
+        props.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
+
+        return props;
+    }
+
+    public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
+        Map<String, Object> props = getBaseConsumerConfigs();
+
+        // no need to set group id for a restore consumer
+        props.remove(ConsumerConfig.GROUP_ID_CONFIG);
+
+        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
+
+        return props;
+    }
+
+    private Map<String, Object> getBaseConsumerConfigs() {
+        Map<String, Object> props = this.originals();
+
+        // set consumer default property values
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        // remove properties that are not required for consumers
+        props.remove(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG);
+        props.remove(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+        props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+        props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+
+        return props;
+    }
+
+    public Map<String, Object> getProducerConfigs(String clientId) {
+        Map<String, Object> props = this.originals();
+
+        // set producer default property values
+        props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+
+        // remove properties that are not required for producers
+        props.remove(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+        props.remove(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+        props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+
+        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
+
+        return props;
+    }
+
+    public Serializer keySerializer() {
+        return getConfiguredInstance(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+    }
+
+    public Serializer valueSerializer() {
+        return getConfiguredInstance(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+    }
+
+    public Deserializer keyDeserializer() {
+        return getConfiguredInstance(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+    }
+
+    public Deserializer valueDeserializer() {
+        return getConfiguredInstance(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+    }
+
+    public static void main(String[] args) {
+        System.out.println(CONFIG.toHtmlTable());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
new file mode 100644
index 0000000..a151392
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.metrics.Sensor;
+
+public interface StreamsMetrics {
+
+    Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
+
+    void recordLatency(Sensor sensor, long startNs, long endNs);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
index 88a8955..a234395 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
@@ -20,11 +20,11 @@ package org.apache.kafka.streams.examples;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.KafkaStreaming;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 
@@ -34,14 +34,14 @@ public class KStreamJob {
 
     public static void main(String[] args) throws Exception {
         Properties props = new Properties();
-        props.put(StreamingConfig.JOB_ID_CONFIG, "example-kstream");
-        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
-        StreamingConfig config = new StreamingConfig(props);
+        props.put(StreamsConfig.JOB_ID_CONFIG, "example-kstream");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+        StreamsConfig config = new StreamsConfig(props);
 
         KStreamBuilder builder = new KStreamBuilder();
 
@@ -78,7 +78,7 @@ public class KStreamJob {
         streams[0].to("topic2");
         streams[1].to("topic3");
 
-        KafkaStreaming kstream = new KafkaStreaming(builder, config);
+        KafkaStreams kstream = new KafkaStreams(builder, config);
         kstream.start();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 2d0b79f..e17c16b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -21,13 +21,13 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreaming;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.Entry;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
@@ -70,11 +70,11 @@ public class ProcessorJob {
                     KeyValueIterator<String, Integer> iter = this.kvStore.all();
 
                     while (iter.hasNext()) {
-                        Entry<String, Integer> entry = iter.next();
+                        KeyValue<String, Integer> entry = iter.next();
 
-                        System.out.println("[" + entry.key() + ", " + entry.value() + "]");
+                        System.out.println("[" + entry.key + ", " + entry.value + "]");
 
-                        context.forward(entry.key(), entry.value());
+                        context.forward(entry.key, entry.value);
                     }
 
                     iter.close();
@@ -90,15 +90,15 @@ public class ProcessorJob {
 
     public static void main(String[] args) throws Exception {
         Properties props = new Properties();
-        props.put(StreamingConfig.JOB_ID_CONFIG, "example-processor");
-        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
-        StreamingConfig config = new StreamingConfig(props);
+        props.put(StreamsConfig.JOB_ID_CONFIG, "example-processor");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+        StreamsConfig config = new StreamsConfig(props);
 
         TopologyBuilder builder = new TopologyBuilder();
 
@@ -109,7 +109,7 @@ public class ProcessorJob {
 
         builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
 
-        KafkaStreaming streaming = new KafkaStreaming(builder, config);
-        streaming.start();
+        KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index dfed661..26f04f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 87298d1..feb28ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
 
 /**
  * KTable is an abstraction of a change log stream.

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
deleted file mode 100644
index f633f6e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public class KeyValue<K, V> {
-
-    public final K key;
-    public final V value;
-
-    public KeyValue(K key, V value) {
-        this.key = key;
-        this.value = value;
-    }
-
-    public static <K, V> KeyValue<K, V> pair(K key, V value) {
-        return new KeyValue<>(key, value);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 91bfa9e..26002f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
index daef8b1..ff7f9ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ce89220..98e50c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -36,7 +36,7 @@ import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.streams.state.Serdes;
 
@@ -217,14 +217,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
         String name = topology.newName(SINK_NAME);
-        StreamPartitioner<K, V> streamPartitioner = null;
+        StreamsPartitioner<K, V> streamsPartitioner = null;
 
         if (keySerializer != null && keySerializer instanceof WindowedSerializer) {
             WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
-            streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
+            streamsPartitioner = (StreamsPartitioner<K, V>) new WindowedStreamsPartitioner<Object, V>(windowedSerializer);
         }
 
-        topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name);
+        topology.addSink(name, topic, keySerializer, valSerializer, streamsPartitioner, this.name);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 01e3325..a4ac9b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
index 57f1431..a40449b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 7d6eb27..c484c7b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index a9d8f97..4299c66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.Processor;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 8ee557c..d046090 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Reducer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 12fcc17..499f721 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
deleted file mode 100644
index 10e69cc..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.StreamPartitioner;
-
-public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
-
-    private final WindowedSerializer<K> serializer;
-
-    public WindowedStreamPartitioner(WindowedSerializer<K> serializer) {
-        this.serializer = serializer;
-    }
-
-    /**
-     * WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value
-     * and the current number of partitions. The partition number id determined by the original key of the windowed key
-     * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
-     *
-     * @param windowedKey the key of the message
-     * @param value the value of the message
-     * @param numPartitions the total number of partitions
-     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
-     */
-    public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
-        byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
-
-        // hash the keyBytes to choose a partition
-        return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
-    }
-
-    private static int toPositive(int number) {
-        return number & 0x7fffffff;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
new file mode 100644
index 0000000..ff1fa2c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
+
+public class WindowedStreamsPartitioner<K, V> implements StreamsPartitioner<Windowed<K>, V> {
+
+    private final WindowedSerializer<K> serializer;
+
+    public WindowedStreamsPartitioner(WindowedSerializer<K> serializer) {
+        this.serializer = serializer;
+    }
+
+    /**
+     * WindowedStreamsPartitioner determines the partition number for a message with the given windowed key and value
+     * and the current number of partitions. The partition number id determined by the original key of the windowed key
+     * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
+     *
+     * @param windowedKey the key of the message
+     * @param value the value of the message
+     * @param numPartitions the total number of partitions
+     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
+     */
+    public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
+        byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
+
+        // hash the keyBytes to choose a partition
+        return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+    }
+
+    private static int toPositive(int number) {
+        return number & 0x7fffffff;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index fa19ed7..41e2235 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsMetrics;
 
 import java.io.File;
 
@@ -70,9 +70,9 @@ public interface ProcessorContext {
     /**
      * Returns Metrics instance
      *
-     * @return StreamingMetrics
+     * @return StreamsMetrics
      */
-    StreamingMetrics metrics();
+    StreamsMetrics metrics();
 
     /**
      * Registers and possibly restores the specified storage engine.