You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/23 00:25:43 UTC

[1/2] kafka git commit: KAFKA-3066: Demo Examples for Kafka Streams

Repository: kafka
Updated Branches:
  refs/heads/trunk a19729fe6 -> c197113a9


http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java
new file mode 100644
index 0000000..60b3b96
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+public class WallclockTimestampExtractor implements TimestampExtractor {
+    @Override
+    public long extract(ConsumerRecord<Object, Object> record) {
+        return System.currentTimeMillis();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 777fae5..b2af904 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.examples.WallclockTimestampExtractor;
+import org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
new file mode 100644
index 0000000..1b8cbb8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class WindowedStreamPartitionerTest {
+
+    private String topicName = "topic";
+
+    private IntegerSerializer keySerializer = new IntegerSerializer();
+    private StringSerializer valSerializer = new StringSerializer();
+
+    private List<PartitionInfo> infos = Arrays.asList(
+            new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
+    );
+
+    private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+
+    @Test
+    public void testCopartitioning() {
+
+        Random rand = new Random();
+
+        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
+
+        WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
+        WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer);
+
+        for (int k = 0; k < 10; k++) {
+            Integer key = rand.nextInt();
+            byte[] keyBytes = keySerializer.serialize(topicName, key);
+
+            String value = key.toString();
+            byte[] valueBytes = valSerializer.serialize(topicName, value);
+
+            Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
+
+            for (int w = 0; w < 10; w++) {
+                HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
+
+                Windowed<Integer> windowedKey = new Windowed<>(key, window);
+                Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
+
+                assertEquals(expected, actual);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
deleted file mode 100644
index 18494fd..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-
-public class WindowedStreamsPartitionerTest {
-
-    private String topicName = "topic";
-
-    private IntegerSerializer keySerializer = new IntegerSerializer();
-    private StringSerializer valSerializer = new StringSerializer();
-
-    private List<PartitionInfo> infos = Arrays.asList(
-            new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
-    );
-
-    private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
-
-    @Test
-    public void testCopartitioning() {
-
-        Random rand = new Random();
-
-        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
-
-        WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
-        WindowedStreamsPartitioner<Integer, String> streamPartitioner = new WindowedStreamsPartitioner<>(windowedSerializer);
-
-        for (int k = 0; k < 10; k++) {
-            Integer key = rand.nextInt();
-            byte[] keyBytes = keySerializer.serialize(topicName, key);
-
-            String value = key.toString();
-            byte[] valueBytes = valSerializer.serialize(topicName, value);
-
-            Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
-
-            for (int w = 0; w < 10; w++) {
-                HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
-
-                Windowed<Integer> windowedKey = new Windowed<>(key, window);
-                Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
-
-                assertEquals(expected, actual);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 60bd309..cb6ea05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -194,8 +194,8 @@ public class ProcessorTopologyTest {
         assertNull(driver.readOutput(topic));
     }
 
-    protected <K, V> StreamsPartitioner<K, V> constantPartitioner(final Integer partition) {
-        return new StreamsPartitioner<K, V>() {
+    protected <K, V> StreamPartitioner<K, V> constantPartitioner(final Integer partition) {
+        return new StreamPartitioner<K, V>() {
             @Override
             public Integer partition(K key, V value, int numPartitions) {
                 return partition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index fd604b6..ffcf9ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -93,8 +93,8 @@ public class StandbyTaskTest {
                 setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                 setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamsConfig.JOB_ID_CONFIG, jobId);
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamsConfig.JOB_ID_CONFIG, "standby-task-test");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
             }
@@ -200,7 +200,7 @@ public class StandbyTaskTest {
 
             task.close();
 
-            File taskDir = new File(baseDir, taskId.toString());
+            File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString());
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
             Map<TopicPartition, Long> offsets = checkpoint.read();
 
@@ -298,7 +298,7 @@ public class StandbyTaskTest {
 
             task.close();
 
-            File taskDir = new File(baseDir, taskId.toString());
+            File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString());
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
             Map<TopicPartition, Long> offsets = checkpoint.read();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 2d531bc..039cb96 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -59,8 +59,9 @@ import java.util.UUID;
 
 public class StreamThreadTest {
 
-    private String clientId = "clientId";
-    private UUID processId = UUID.randomUUID();
+    private final String clientId = "clientId";
+    private final String jobId = "stream-thread-test";
+    private final UUID processId = UUID.randomUUID();
 
     private TopicPartition t1p1 = new TopicPartition("topic1", 1);
     private TopicPartition t1p2 = new TopicPartition("topic1", 2);
@@ -117,8 +118,8 @@ public class StreamThreadTest {
                 setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                 setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamsConfig.JOB_ID_CONFIG, jobId);
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-thread-test");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
             }
         };
@@ -128,13 +129,14 @@ public class StreamThreadTest {
         public boolean committed = false;
 
         public TestStreamTask(TaskId id,
+                              String jobId,
                               Collection<TopicPartition> partitions,
                               ProcessorTopology topology,
                               Consumer<byte[], byte[]> consumer,
                               Producer<byte[], byte[]> producer,
                               Consumer<byte[], byte[]> restoreConsumer,
                               StreamsConfig config) {
-            super(id, "jobId", partitions, topology, consumer, producer, restoreConsumer, config, null);
+            super(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, null);
         }
 
         @Override
@@ -161,11 +163,11 @@ public class StreamThreadTest {
         builder.addSource("source3", "topic3");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
 
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) {
+        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), new SystemTime()) {
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                 ProcessorTopology topology = builder.build(id.topicGroupId);
-                return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
+                return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
             }
         };
 
@@ -264,10 +266,12 @@ public class StreamThreadTest {
 
             StreamsConfig config = new StreamsConfig(props);
 
-            File stateDir1 = new File(baseDir, task1.toString());
-            File stateDir2 = new File(baseDir, task2.toString());
-            File stateDir3 = new File(baseDir, task3.toString());
-            File extraDir = new File(baseDir, "X");
+            File jobDir = new File(baseDir, jobId);
+            jobDir.mkdir();
+            File stateDir1 = new File(jobDir, task1.toString());
+            File stateDir2 = new File(jobDir, task2.toString());
+            File stateDir3 = new File(jobDir, task3.toString());
+            File extraDir = new File(jobDir, "X");
             stateDir1.mkdir();
             stateDir2.mkdir();
             stateDir3.mkdir();
@@ -281,7 +285,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId,  processId, new Metrics(), mockTime) {
                 @Override
                 public void maybeClean() {
                     super.maybeClean();
@@ -290,7 +294,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
+                    return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
                 }
             };
 
@@ -403,7 +407,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId,  processId, new Metrics(), mockTime) {
                 @Override
                 public void maybeCommit() {
                     super.maybeCommit();
@@ -412,7 +416,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
+                    return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
                 }
             };
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 36e487b..1e9c3ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.test.MockProcessorContext;
@@ -249,7 +249,7 @@ public class KeyValueStoreTestDriver<K, V> {
             }
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer,
-                                    StreamsPartitioner<K1, V1> partitioner) {
+                                    StreamPartitioner<K1, V1> partitioner) {
                 recordFlushed(record.key(), record.value());
             }
         };

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 8f8e00f..2dc567e 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -130,7 +130,7 @@ public class KStreamTestDriver {
         
         @Override
         public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
-                                StreamsPartitioner<K, V> partitioner) {
+                                StreamPartitioner<K, V> partitioner) {
             // The serialization is skipped.
             process(record.topic(), record.key(), record.value());
         }


[2/2] kafka git commit: KAFKA-3066: Demo Examples for Kafka Streams

Posted by ew...@apache.org.
KAFKA-3066: Demo Examples for Kafka Streams

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #797 from guozhangwang/K3066


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c197113a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c197113a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c197113a

Branch: refs/heads/trunk
Commit: c197113a9c04e2f6c2d1a72161c0d40d5804490e
Parents: a19729f
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Jan 22 15:25:24 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Jan 22 15:25:24 2016 -0800

----------------------------------------------------------------------
 build.gradle                                    |  25 ++++
 settings.gradle                                 |   2 +-
 .../examples/pageview/JsonPOJODeserializer.java |  66 ++++++++++
 .../examples/pageview/JsonPOJOSerializer.java   |  60 +++++++++
 .../examples/pageview/PageViewTypedJob.java     | 127 +++++++++++++++++++
 .../examples/pageview/PageViewUnTypedJob.java   | 107 ++++++++++++++++
 .../kafka/streams/examples/pipe/PipeJob.java    |  50 ++++++++
 .../examples/wordcount/WordCountJob.java        | 103 +++++++++++++++
 .../wordcount/WordCountProcessorJob.java        | 121 ++++++++++++++++++
 .../org/apache/kafka/streams/StreamsConfig.java |   6 +-
 .../kafka/streams/examples/KStreamJob.java      |  84 ------------
 .../kafka/streams/examples/ProcessorJob.java    | 115 -----------------
 .../examples/WallclockTimestampExtractor.java   |  28 ----
 .../org/apache/kafka/streams/kstream/Count.java |   6 +-
 .../streams/kstream/KeyValueToDoubleMapper.java |  23 ----
 .../streams/kstream/KeyValueToIntMapper.java    |  23 ----
 .../streams/kstream/KeyValueToLongMapper.java   |  23 ----
 .../kafka/streams/kstream/TumblingWindows.java  |   8 +-
 .../kafka/streams/kstream/UnlimitedWindows.java |  10 +-
 .../streams/kstream/internals/KStreamImpl.java  |   8 +-
 .../internals/WindowedStreamPartitioner.java    |  52 ++++++++
 .../internals/WindowedStreamsPartitioner.java   |  52 --------
 .../streams/processor/StreamPartitioner.java    |  59 +++++++++
 .../streams/processor/StreamsPartitioner.java   |  59 ---------
 .../streams/processor/TopologyBuilder.java      |  26 ++--
 .../processor/internals/AbstractTask.java       |   3 +-
 .../processor/internals/RecordCollector.java    |   4 +-
 .../streams/processor/internals/SinkNode.java   |   6 +-
 .../processor/internals/StreamThread.java       |  20 ++-
 .../internals/WallclockTimestampExtractor.java  |  28 ++++
 .../apache/kafka/streams/StreamsConfigTest.java |   2 +-
 .../WindowedStreamPartitionerTest.java          |  84 ++++++++++++
 .../WindowedStreamsPartitionerTest.java         |  84 ------------
 .../internals/ProcessorTopologyTest.java        |   6 +-
 .../processor/internals/StandbyTaskTest.java    |   6 +-
 .../processor/internals/StreamThreadTest.java   |  32 +++--
 .../streams/state/KeyValueStoreTestDriver.java  |   4 +-
 .../apache/kafka/test/KStreamTestDriver.java    |   4 +-
 38 files changed, 972 insertions(+), 554 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 150cac7..0b1dcc4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -514,6 +514,7 @@ project(':streams') {
 
     dependencies {
         compile project(':clients')
+        compile project(':connect:json')  // this dependency should be removed after we unify data API
         compile libs.slf4jlog4j
         compile libs.rocksDBJni
         compile libs.zkclient // this dependency should be removed after KIP-4
@@ -542,6 +543,30 @@ project(':streams') {
     }
 }
 
+project(':streams:examples') {
+  archivesBaseName = "kafka-streams-examples"
+
+  dependencies {
+    compile project(':streams')
+    compile project(':connect:json')  // this dependency should be removed after we unify data API
+  }
+
+  javadoc {
+    enabled = false
+  }
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.runtime) {
+      exclude('kafka-streams*')
+    }
+    into "$buildDir/dependant-libs-${versions.scala}"
+  }
+
+  jar {
+    dependsOn 'copyDependantLibs'
+  }
+}
+
 project(':log4j-appender') {
   archivesBaseName = "kafka-log4j-appender"
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 097c43b..d430c2f 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,5 +13,5 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
+include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
         'connect:api', 'connect:runtime', 'connect:json', 'connect:file'

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
new file mode 100644
index 0000000..583ec2d
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.streams.examples.pageview;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+
+/**
+ * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily
+ * structured data without having associated Java classes. This deserializer also supports Connect schemas.
+ */
+public class JsonPOJODeserializer<T> implements Deserializer<T> {
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    private Class<T> tClass;
+
+    /**
+     * Default constructor needed by Kafka
+     */
+    public JsonPOJODeserializer() {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Map<String, ?> props, boolean isKey) {
+        tClass = (Class<T>) props.get("JsonPOJOClass");
+    }
+
+    @Override
+    public T deserialize(String topic, byte[] bytes) {
+        if (bytes == null)
+            return null;
+
+        T data;
+        try {
+            data = objectMapper.readValue(bytes, tClass);
+        } catch (Exception e) {
+            throw new SerializationException(e);
+        }
+
+        return data;
+    }
+
+    @Override
+    public void close() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
new file mode 100644
index 0000000..bb60327
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.streams.examples.pageview;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Map;
+
+public class JsonPOJOSerializer<T> implements Serializer<T> {
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private Class<T> tClass;
+
+    /**
+     * Default constructor needed by Kafka
+     */
+    public JsonPOJOSerializer() {
+
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Map<String, ?> props, boolean isKey) {
+        tClass = (Class<T>) props.get("JsonPOJOClass");
+    }
+
+    @Override
+    public byte[] serialize(String topic, T data) {
+        if (data == null)
+            return null;
+
+        try {
+            return objectMapper.writeValueAsBytes(data);
+        } catch (Exception e) {
+            throw new SerializationException("Error serializing JSON message", e);
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
new file mode 100644
index 0000000..c064848
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Count;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.util.Properties;
+
+public class PageViewTypedJob {
+
+    // POJO classes
+    static public class PageView {
+        public String user;
+        public String page;
+    }
+
+    static public class UserProfile {
+        public String user;
+        public String region;
+    }
+
+    static public class PageViewByRegion {
+        public String user;
+        public String page;
+        public String region;
+    }
+
+    static public class WindowedPageViewByRegion {
+        public long windowStart;
+        public String region;
+    }
+
+    static public class RegionCount {
+        public long count;
+        public String region;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+        final Serializer<Long> longSerializer = new LongSerializer();
+        final Deserializer<Long> longDeserializer = new LongDeserializer();
+
+
+        KStream<String, PageView> views = builder.stream("streams-pageview-input");
+
+        KStream<String, PageView> viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.user, record));
+
+        KTable<String, UserProfile> users = builder.table("streams-userprofile-input");
+
+        KStream<WindowedPageViewByRegion, RegionCount> regionCount = viewsByUser
+                .leftJoin(users, (view, profile) -> {
+                    PageViewByRegion viewByRegion = new PageViewByRegion();
+                    viewByRegion.user = view.user;
+                    viewByRegion.page = view.page;
+                    viewByRegion.region = profile.region;
+
+                    return viewByRegion;
+                })
+                .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
+                .aggregateByKey(new Count<String, PageViewByRegion>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+                        stringSerializer, longSerializer,
+                        stringDeserializer, longDeserializer)
+                .toStream()
+                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
+                    @Override
+                    public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
+                        WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
+                        wViewByRegion.windowStart = key.window().start();
+                        wViewByRegion.region = key.value();
+
+                        RegionCount rCount = new RegionCount();
+                        rCount.region = key.value();
+                        rCount.count = value;
+
+                        return new KeyValue<>(wViewByRegion, rCount);
+                    }
+                });
+
+        // write to the result topic
+        regionCount.to("streams-pageviewstats-output", new JsonPOJOSerializer<>(), new JsonPOJOSerializer<>());
+
+        KafkaStreams kstream = new KafkaStreams(builder, props);
+        kstream.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
new file mode 100644
index 0000000..1ae02c9
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.connect.json.JsonSerializer;
+import org.apache.kafka.connect.json.JsonDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Count;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.util.Properties;
+
+public class PageViewUnTypedJob {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+
+        StreamsConfig config = new StreamsConfig(props);
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+        final Serializer<Long> longSerializer = new LongSerializer();
+        final Deserializer<Long> longDeserializer = new LongDeserializer();
+
+
+        KStream<String, JsonNode> views = builder.stream("streams-pageview-input");
+
+        KStream<String, JsonNode> viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.get("user").textValue(), record));
+
+        KTable<String, JsonNode> users = builder.table("streams-userprofile-input");
+
+        KTable<String, String> userRegions = users.mapValues(record -> record.get("region").textValue());
+
+        KStream<JsonNode, JsonNode> regionCount = viewsByUser
+                .leftJoin(userRegions, (view, region) -> {
+                    ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+                    return (JsonNode) jNode.put("user", view.get("user").textValue())
+                            .put("page", view.get("page").textValue())
+                            .put("region", region);
+                })
+                .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
+                .aggregateByKey(new Count<String, JsonNode>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+                        stringSerializer, longSerializer,
+                        stringDeserializer, longDeserializer)
+                .toStream()
+                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
+                    @Override
+                    public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
+                        ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
+                        keyNode.put("window-start", key.window().start())
+                                .put("region", key.window().start());
+
+                        ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
+                        keyNode.put("count", value);
+
+                        return new KeyValue<JsonNode, JsonNode>((JsonNode) keyNode, (JsonNode) valueNode);
+                    }
+                });
+
+        // write to the result topic
+        regionCount.to("streams-pageviewstats-output");
+
+        KafkaStreams kstream = new KafkaStreams(builder, config);
+        kstream.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
new file mode 100644
index 0000000..4a4f97f
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pipe;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.Properties;
+
+public class PipeJob {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pipe");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        // can specify underlying client configs if necessary
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        builder.stream("streams-file-input").to("streams-pipe-output");
+
+        KafkaStreams kstream = new KafkaStreams(builder, props);
+        kstream.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
new file mode 100644
index 0000000..8aa15a4
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.wordcount;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.connect.json.JsonSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Count;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+public class WordCountJob {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        // can specify underlying client configs if necessary
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+        final Serializer<Long> longSerializer = new LongSerializer();
+        final Deserializer<Long> longDeserializer = new LongDeserializer();
+        final Serializer<JsonNode> JsonSerializer = new JsonSerializer();
+
+        KStream<String, String> source = builder.stream("streams-file-input");
+
+        KStream<String, JsonNode> counts = source
+                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                    @Override
+                    public Iterable<String> apply(String value) {
+                        return Arrays.asList(value.toLowerCase().split(" "));
+                    }
+                }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+                    @Override
+                    public KeyValue<String, String> apply(String key, String value) {
+                        return new KeyValue<String, String>(value, value);
+                    }
+                })
+                .aggregateByKey(new Count<>(), UnlimitedWindows.of("Counts").startOn(0L),
+                        stringSerializer, longSerializer,
+                        stringDeserializer, longDeserializer)
+                .toStream()
+                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<String, JsonNode>>() {
+                    @Override
+                    public KeyValue<String, JsonNode> apply(Windowed<String> key, Long value) {
+                        ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+                        jNode.put("word", key.value())
+                             .put("count", value);
+
+                        return new KeyValue<String, JsonNode>(null, jNode);
+                    }
+                });
+
+        counts.to("streams-wordcount-output", stringSerializer, JsonSerializer);
+
+        KafkaStreams kstream = new KafkaStreams(builder, props);
+        kstream.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
new file mode 100644
index 0000000..63692bd
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Properties;
+
+public class WordCountProcessorJob {
+
+    private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
+
+        @Override
+        public Processor<String, String> get() {
+            return new Processor<String, String>() {
+                private ProcessorContext context;
+                private KeyValueStore<String, Integer> kvStore;
+
+                @Override
+                @SuppressWarnings("unchecked")
+                public void init(ProcessorContext context) {
+                    this.context = context;
+                    this.context.schedule(1000);
+                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
+                }
+
+                @Override
+                public void process(String dummy, String line) {
+                    String words[] = line.toLowerCase().split(" ");
+
+                    for (String word : words) {
+                        Integer oldValue = this.kvStore.get(word);
+
+                        if (oldValue == null) {
+                            this.kvStore.put(word, 1);
+                        } else {
+                            this.kvStore.put(word, oldValue + 1);
+                        }
+                    }
+
+                    context.commit();
+                }
+
+                @Override
+                public void punctuate(long timestamp) {
+                    KeyValueIterator<String, Integer> iter = this.kvStore.all();
+
+                    System.out.println("----------- " + timestamp + "----------- ");
+
+                    while (iter.hasNext()) {
+                        KeyValue<String, Integer> entry = iter.next();
+
+                        System.out.println("[" + entry.key + ", " + entry.value + "]");
+
+                        context.forward(entry.key, entry.value.toString());
+                    }
+
+                    iter.close();
+                }
+
+                @Override
+                public void close() {
+                    this.kvStore.close();
+                }
+            };
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount-processor");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        // can specify underlying client configs if necessary
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("Source", "streams-file-input");
+
+        builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
+        builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
+
+        builder.addSink("Sink", "streams-wordcount-output", "Process");
+
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 3843b1d..16bb06a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -113,7 +113,7 @@ public class StreamsConfig extends AbstractConfig {
     /** <code>client.id</code> */
     public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
 
-    private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
+    private static final String WALLCLOCK_TIMESTAMP_EXTRACTOR = "org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor";
 
     static {
         CONFIG = new ConfigDef().define(JOB_ID_CONFIG,      // required with no default value
@@ -136,8 +136,8 @@ public class StreamsConfig extends AbstractConfig {
                                         StreamsConfig.ZOOKEEPER_CONNECT_DOC)
                                 .define(STATE_DIR_CONFIG,
                                         Type.STRING,
-                                        SYSTEM_TEMP_DIRECTORY,
-                                        Importance.HIGH,
+                                        "/tmp/kafka-streams",
+                                        Importance.MEDIUM,
                                         STATE_DIR_DOC)
                                 .define(KEY_SERIALIZER_CLASS_CONFIG,        // required with no default value
                                         Type.CLASS,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
deleted file mode 100644
index a234395..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Predicate;
-
-import java.util.Properties;
-
-public class KStreamJob {
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "example-kstream");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
-        StreamsConfig config = new StreamsConfig(props);
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        KStream<String, String> stream1 = builder.stream("topic1");
-
-        KStream<String, Integer> stream2 =
-            stream1.map(new KeyValueMapper<String, String, KeyValue<String, Integer>>() {
-                @Override
-                public KeyValue<String, Integer> apply(String key, String value) {
-                    return new KeyValue<>(key, new Integer(value));
-                }
-            }).filter(new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return true;
-                }
-            });
-
-        KStream<String, Integer>[] streams = stream2.branch(
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return (value % 2) == 0;
-                }
-            },
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return true;
-                }
-            }
-        );
-
-        streams[0].to("topic2");
-        streams[1].to("topic3");
-
-        KafkaStreams kstream = new KafkaStreams(builder, config);
-        kstream.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
deleted file mode 100644
index e17c16b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-
-import java.util.Properties;
-
-public class ProcessorJob {
-
-    private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
-
-        @Override
-        public Processor<String, String> get() {
-            return new Processor<String, String>() {
-                private ProcessorContext context;
-                private KeyValueStore<String, Integer> kvStore;
-
-                @Override
-                @SuppressWarnings("unchecked")
-                public void init(ProcessorContext context) {
-                    this.context = context;
-                    this.context.schedule(1000);
-                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("LOCAL-STATE");
-                }
-
-                @Override
-                public void process(String key, String value) {
-                    Integer oldValue = this.kvStore.get(key);
-                    Integer newValue = Integer.parseInt(value);
-                    if (oldValue == null) {
-                        this.kvStore.put(key, newValue);
-                    } else {
-                        this.kvStore.put(key, oldValue + newValue);
-                    }
-
-                    context.commit();
-                }
-
-                @Override
-                public void punctuate(long timestamp) {
-                    KeyValueIterator<String, Integer> iter = this.kvStore.all();
-
-                    while (iter.hasNext()) {
-                        KeyValue<String, Integer> entry = iter.next();
-
-                        System.out.println("[" + entry.key + ", " + entry.value + "]");
-
-                        context.forward(entry.key, entry.value);
-                    }
-
-                    iter.close();
-                }
-
-                @Override
-                public void close() {
-                    this.kvStore.close();
-                }
-            };
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "example-processor");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
-        StreamsConfig config = new StreamsConfig(props);
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
-
-        builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
-        builder.addStateStore(Stores.create("LOCAL-STATE").withStringKeys().withIntegerValues().inMemory().build(), "PROCESS");
-
-        builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
-
-        KafkaStreams streams = new KafkaStreams(builder, config);
-        streams.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
deleted file mode 100644
index 26281d6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-public class WallclockTimestampExtractor implements TimestampExtractor {
-    @Override
-    public long extract(ConsumerRecord<Object, Object> record) {
-        return System.currentTimeMillis();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
index 3c1ed46..8780cc7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.kstream;
 
-public class Count<K> implements Aggregator<K, Long, Long> {
+public class Count<K, V> implements Aggregator<K, V, Long> {
 
     @Override
     public Long initialValue(K aggKey) {
@@ -25,12 +25,12 @@ public class Count<K> implements Aggregator<K, Long, Long> {
     }
 
     @Override
-    public Long add(K aggKey, Long value, Long aggregate) {
+    public Long add(K aggKey, V value, Long aggregate) {
         return aggregate + 1L;
     }
 
     @Override
-    public Long remove(K aggKey, Long value, Long aggregate) {
+    public Long remove(K aggKey, V value, Long aggregate) {
         return aggregate - 1L;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java
deleted file mode 100644
index ae3b858..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface KeyValueToDoubleMapper<K, V> {
-
-    double apply(K key, V value);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java
deleted file mode 100644
index 72e5ee9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface KeyValueToIntMapper<K, V> {
-
-    int apply(K key, V value);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java
deleted file mode 100644
index 3a8d8a8..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface KeyValueToLongMapper<K, V> {
-
-    long apply(K key, V value);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
index 02ece3a..188fe66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.kstream.internals.TumblingWindow;
 
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 public class TumblingWindows extends Windows<TumblingWindow> {
@@ -53,7 +53,11 @@ public class TumblingWindows extends Windows<TumblingWindow> {
     public Map<Long, TumblingWindow> windowsFor(long timestamp) {
         long windowStart = timestamp - timestamp % size;
 
-        return Collections.singletonMap(windowStart, new TumblingWindow(windowStart, windowStart + size));
+        // we cannot use Collections.singleMap since it does not support remove() call
+        Map<Long, TumblingWindow> windows = new HashMap<>();
+        windows.put(windowStart, new TumblingWindow(windowStart, windowStart + size));
+
+        return windows;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 6f47253..06882b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
 
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 public class UnlimitedWindows extends Windows<UnlimitedWindow> {
@@ -48,7 +48,13 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
     @Override
     public Map<Long, UnlimitedWindow> windowsFor(long timestamp) {
         // always return the single unlimited window
-        return Collections.singletonMap(start, new UnlimitedWindow(start));
+
+        // we cannot use Collections.singleMap since it does not support remove() call
+        Map<Long, UnlimitedWindow> windows = new HashMap<>();
+        windows.put(start, new UnlimitedWindow(start));
+
+
+        return windows;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 98e50c3..7ebc28c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -36,7 +36,7 @@ import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.streams.state.Serdes;
 
@@ -217,14 +217,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
         String name = topology.newName(SINK_NAME);
-        StreamsPartitioner<K, V> streamsPartitioner = null;
+        StreamPartitioner<K, V> streamPartitioner = null;
 
         if (keySerializer != null && keySerializer instanceof WindowedSerializer) {
             WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
-            streamsPartitioner = (StreamsPartitioner<K, V>) new WindowedStreamsPartitioner<Object, V>(windowedSerializer);
+            streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
         }
 
-        topology.addSink(name, topic, keySerializer, valSerializer, streamsPartitioner, this.name);
+        topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
new file mode 100644
index 0000000..10e69cc
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
+
+    private final WindowedSerializer<K> serializer;
+
+    public WindowedStreamPartitioner(WindowedSerializer<K> serializer) {
+        this.serializer = serializer;
+    }
+
+    /**
+     * WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value
+     * and the current number of partitions. The partition number id determined by the original key of the windowed key
+     * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
+     *
+     * @param windowedKey the key of the message
+     * @param value the value of the message
+     * @param numPartitions the total number of partitions
+     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
+     */
+    public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
+        byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
+
+        // hash the keyBytes to choose a partition
+        return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+    }
+
+    private static int toPositive(int number) {
+        return number & 0x7fffffff;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
deleted file mode 100644
index ff1fa2c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
-
-public class WindowedStreamsPartitioner<K, V> implements StreamsPartitioner<Windowed<K>, V> {
-
-    private final WindowedSerializer<K> serializer;
-
-    public WindowedStreamsPartitioner(WindowedSerializer<K> serializer) {
-        this.serializer = serializer;
-    }
-
-    /**
-     * WindowedStreamsPartitioner determines the partition number for a message with the given windowed key and value
-     * and the current number of partitions. The partition number id determined by the original key of the windowed key
-     * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
-     *
-     * @param windowedKey the key of the message
-     * @param value the value of the message
-     * @param numPartitions the total number of partitions
-     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
-     */
-    public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
-        byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
-
-        // hash the keyBytes to choose a partition
-        return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
-    }
-
-    private static int toPositive(int number) {
-        return number & 0x7fffffff;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
new file mode 100644
index 0000000..f14d9d9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+/**
+ * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
+ * <p>
+ * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
+ * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
+ * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
+ * <p>
+ * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
+ * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
+ * automatically manage these instances, and adjust when new topology instances are added or removed.
+ * <p>
+ * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
+ * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
+ * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
+ * determine to which partition each message should be written.
+ * <p>
+ * To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner
+ * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
+ * for that topic.
+ * <p>
+ * All StreamPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
+ * 
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
+ *      org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...)
+ * @see TopologyBuilder#addSink(String, String, StreamPartitioner, String...)
+ */
+public interface StreamPartitioner<K, V> {
+
+    /**
+     * Determine the partition number for a message with the given key and value and the current number of partitions.
+     * 
+     * @param key the key of the message
+     * @param value the value of the message
+     * @param numPartitions the total number of partitions
+     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
+     */
+    Integer partition(K key, V value, int numPartitions);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
deleted file mode 100644
index f8d199d..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-/**
- * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
- * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
- * <p>
- * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
- * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
- * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
- * <p>
- * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
- * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
- * automatically manage these instances, and adjust when new topology instances are added or removed.
- * <p>
- * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
- * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
- * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
- * determine to which partition each message should be written.
- * <p>
- * To do this, create a <code>StreamsPartitioner</code> implementation, and when you build your topology specify that custom partitioner
- * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...) adding a sink}
- * for that topic.
- * <p>
- * All StreamsPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
- * 
- * @param <K> the type of keys
- * @param <V> the type of values
- * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
- *      org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...)
- * @see TopologyBuilder#addSink(String, String, StreamsPartitioner, String...)
- */
-public interface StreamsPartitioner<K, V> {
-
-    /**
-     * Determine the partition number for a message with the given key and value and the current number of partitions.
-     * 
-     * @param key the key of the message
-     * @param value the value of the message
-     * @param numPartitions the total number of partitions
-     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
-     */
-    Integer partition(K key, V value, int numPartitions);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index f4e6821..a6b54b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -135,9 +135,9 @@ public class TopologyBuilder {
         public final String topic;
         private Serializer keySerializer;
         private Serializer valSerializer;
-        private final StreamsPartitioner partitioner;
+        private final StreamPartitioner partitioner;
 
-        private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamsPartitioner partitioner) {
+        private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner) {
             super(name);
             this.parents = parents.clone();
             this.topic = topic;
@@ -245,9 +245,9 @@ public class TopologyBuilder {
      * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
-     * @see #addSink(String, String, StreamsPartitioner, String...)
+     * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
     public final TopologyBuilder addSink(String name, String topic, String... parentNames) {
         return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
@@ -260,7 +260,7 @@ public class TopologyBuilder {
      * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      * <p>
-     * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
+     * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
      * the named Kafka topic's partitions. Such control is often useful with topologies that use
      * {@link #addStateStore(StateStoreSupplier, String...) state stores}
      * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@@ -274,9 +274,9 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public final TopologyBuilder addSink(String name, String topic, StreamsPartitioner partitioner, String... parentNames) {
+    public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) {
         return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames);
     }
 
@@ -284,7 +284,7 @@ public class TopologyBuilder {
      * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
      * The sink will use the specified key and value serializers.
      * <p>
-     * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
+     * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
      * the named Kafka topic's partitions. Such control is often useful with topologies that use
      * {@link #addStateStore(StateStoreSupplier, String...) state stores}
      * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@@ -302,11 +302,11 @@ public class TopologyBuilder {
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamsPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
     public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
-        return addSink(name, topic, keySerializer, valSerializer, (StreamsPartitioner) null, parentNames);
+        return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null, parentNames);
     }
 
     /**
@@ -326,10 +326,10 @@ public class TopologyBuilder {
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamsPartitioner, String...)
+     * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      */
-    public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner, String... parentNames) {
+    public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
         if (nodeFactories.containsKey(name))
             throw new TopologyException("Processor " + name + " is already added.");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index ef4c3c7..68680ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -57,7 +57,8 @@ public abstract class AbstractTask {
 
         // create the processor state manager
         try {
-            File stateFile = new File(config.getString(StreamsConfig.STATE_DIR_CONFIG), id.toString());
+            File jobStateDir = StreamThread.makeStateDir(jobId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
+            File stateFile = new File(jobStateDir.getCanonicalPath(), id.toString());
             // if partitions is null, this is a standby task
             this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 25c663d..fe0472e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +72,7 @@ public class RecordCollector {
     }
 
     public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
-                            StreamsPartitioner<K, V> partitioner) {
+                            StreamPartitioner<K, V> partitioner) {
         byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
         byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
         Integer partition = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 88b3f56..7ab59ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -20,18 +20,18 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 
 public class SinkNode<K, V> extends ProcessorNode<K, V> {
 
     private final String topic;
     private Serializer<K> keySerializer;
     private Serializer<V> valSerializer;
-    private final StreamsPartitioner<K, V> partitioner;
+    private final StreamPartitioner<K, V> partitioner;
 
     private ProcessorContext context;
 
-    public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner) {
+    public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner) {
         super(name);
 
         this.topic = topic;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e5d0922..f118f60 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -104,6 +104,18 @@ public class StreamThread extends Thread {
     private final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
     private boolean processStandbyRecords = false;
 
+    static File makeStateDir(String jobId, String baseDirName) {
+        File baseDir = new File(baseDirName);
+        if (!baseDir.exists())
+            baseDir.mkdir();
+
+        File stateDir = new File(baseDir, jobId);
+        if (!stateDir.exists())
+            stateDir.mkdir();
+
+        return stateDir;
+    }
+
     final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
@@ -167,8 +179,7 @@ public class StreamThread extends Thread {
         this.standbyRecords = new HashMap<>();
 
         // read in task specific config values
-        this.stateDir = new File(this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
-        this.stateDir.mkdir();
+        this.stateDir = makeStateDir(this.jobId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
         this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
         this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
@@ -452,14 +463,15 @@ public class StreamThread extends Thread {
             if (stateDirs != null) {
                 for (File dir : stateDirs) {
                     try {
-                        TaskId id = TaskId.parse(dir.getName());
+                        String dirName = dir.getName();
+                        TaskId id = TaskId.parse(dirName.substring(dirName.lastIndexOf("-") + 1));
 
                         // try to acquire the exclusive lock on the state directory
                         FileLock directoryLock = null;
                         try {
                             directoryLock = ProcessorStateManager.lockStateDirectory(dir);
                             if (directoryLock != null) {
-                                log.info("Deleting obsolete state directory {} after delayed {} ms.", dir.getAbsolutePath(), cleanTimeMs);
+                                log.info("Deleting obsolete state directory {} for task {} after delayed {} ms.", dir.getAbsolutePath(), id, cleanTimeMs);
                                 Utils.delete(dir);
                             }
                         } catch (IOException e) {