You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/14 02:16:02 UTC

[2/2] kafka git commit: KAFKA-3081: KTable Aggregation

KAFKA-3081: KTable Aggregation

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Matsuda

Closes #761 from guozhangwang/K3081


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4f22705c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4f22705c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4f22705c

Branch: refs/heads/trunk
Commit: 4f22705c7d0c8e8cab68883e76f554439341e34a
Parents: 7001174
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Jan 13 17:15:57 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jan 13 17:15:57 2016 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KTable.java    |  58 +---
 .../kstream/internals/ChangedDeserializer.java  |  59 ++++
 .../kstream/internals/ChangedSerializer.java    |  57 ++++
 .../kstream/internals/CountSupplier.java        |  52 ++++
 .../internals/DefaultWindowedDeserializer.java  |  59 ----
 .../internals/DefaultWindowedSerializer.java    |  57 ----
 .../streams/kstream/internals/KStreamImpl.java  |  58 ++--
 .../kstream/internals/KTableAggregate.java      | 118 +++++++
 .../streams/kstream/internals/KTableImpl.java   | 215 +++++++------
 .../kstream/internals/KTableMapValues.java      |   8 +-
 .../kstream/internals/KTableRepartitionMap.java | 110 +++++++
 .../kstream/internals/LongSumSupplier.java      |  52 ++++
 .../kstream/internals/WindowedDeserializer.java |  59 ++++
 .../kstream/internals/WindowedSerializer.java   |  57 ++++
 .../kstream/internals/KTableAggregateTest.java  | 122 ++++++++
 .../internals/KTableMapValuesImplTest.java      | 308 -------------------
 .../kstream/internals/KTableMapValuesTest.java  | 308 +++++++++++++++++++
 .../apache/kafka/test/NoOpKeyValueMapper.java   |  29 ++
 18 files changed, 1195 insertions(+), 591 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 997edcd..9837dae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -20,8 +20,6 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 
-import java.util.Collection;
-
 /**
  * KTable is an abstraction of a change log stream.
  *
@@ -152,9 +150,11 @@ public interface KTable<K, V> {
      */
     <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
                                           KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                          Serializer<K> keySerializer,
+                                          Serializer<K1> keySerializer,
+                                          Serializer<V1> valueSerializer,
                                           Serializer<V2> aggValueSerializer,
-                                          Deserializer<K> keyDeserializer,
+                                          Deserializer<K1> keyDeserializer,
+                                          Deserializer<V1> valueDeserializer,
                                           Deserializer<V2> aggValueDeserializer,
                                           String name);
 
@@ -167,59 +167,21 @@ public interface KTable<K, V> {
      */
     <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
                               KeyValueToLongMapper<K, V> valueSelector,
-                              Serializer<K> keySerializer,
-                              Deserializer<K> keyDeserializer,
+                              Serializer<K1> keySerializer,
+                              Deserializer<K1> keyDeserializer,
                               String name);
 
     /**
-     * Sum extracted integer values of this table by the selected aggregation key
-     *
-     * @param keySelector the class of KeyValueMapper to select the aggregation key
-     * @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value
-     * @param name the name of the resulted table
-     */
-    <K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector,
-                                 KeyValueToIntMapper<K, V> valueSelector,
-                                 Serializer<K> keySerializer,
-                                 Deserializer<K> keyDeserializer,
-                                 String name);
-
-    /**
-     * Sum extracted double decimal values of this table by the selected aggregation key
-     *
-     * @param keySelector the class of KeyValueMapper to select the aggregation key
-     * @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value
-     * @param name the name of the resulted table
-     */
-    <K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector,
-                                KeyValueToDoubleMapper<K, V> valueSelector,
-                                Serializer<K> keySerializer,
-                                Deserializer<K> keyDeserializer,
-                                String name);
-
-    /**
      * Count number of records of this table by the selected aggregation key
      *
      * @param keySelector the class of KeyValueMapper to select the aggregation key
      * @param name the name of the resulted table
      */
     <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
-                                Serializer<K> keySerializer,
-                                Deserializer<K> keyDeserializer,
+                                Serializer<K1> keySerializer,
+                                Serializer<V> valueSerializer,
+                                Deserializer<K1> keyDeserializer,
+                                Deserializer<V> valueDeserializer,
                                 String name);
 
-    /**
-     * Get the top-k values of this table by the selected aggregation key
-     *
-     * @param k parameter of the top-k computation
-     * @param keySelector the class of KeyValueMapper to select the aggregation key
-     * @param name the name of the resulted table
-     */
-    <K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k,
-                                                                    KeyValueMapper<K, V, K1> keySelector,
-                                                                    Serializer<K> keySerializer,
-                                                                    Serializer<V1> aggValueSerializer,
-                                                                    Deserializer<K> keyDeserializer,
-                                                                    Deserializer<V1> aggValueDeserializer,
-                                                                    String name);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
new file mode 100644
index 0000000..d4c4e2d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
+
+    private static final int NEWFLAG_SIZE = 1;
+
+    private final Deserializer<T> inner;
+
+    public ChangedDeserializer(Deserializer<T> inner) {
+        this.inner = inner;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // do nothing
+    }
+
+    @Override
+    public Change<T> deserialize(String topic, byte[] data) {
+
+        byte[] bytes = new byte[data.length - NEWFLAG_SIZE];
+
+        System.arraycopy(data, 0, bytes, 0, bytes.length);
+
+        if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
+            return new Change<>(inner.deserialize(topic, bytes), null);
+        } else {
+            return new Change<>(null, inner.deserialize(topic, bytes));
+        }
+    }
+
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
new file mode 100644
index 0000000..e9b7cad
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ChangedSerializer<T> implements Serializer<Change<T>> {
+
+    private static final int NEWFLAG_SIZE = 1;
+
+    private final Serializer<T> inner;
+
+    public ChangedSerializer(Serializer<T> inner) {
+        this.inner = inner;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // do nothing
+    }
+
+    @Override
+    public byte[] serialize(String topic, Change<T> data) {
+        // only one of the old / new values would be not null
+        byte[] serializedKey = inner.serialize(topic, data.newValue != null ? data.newValue : data.oldValue);
+
+        ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE);
+        buf.put(serializedKey);
+        buf.put((byte) (data.newValue != null ? 1 : 0));
+
+        return buf.array();
+    }
+
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
new file mode 100644
index 0000000..b7dc5aa
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.AggregatorSupplier;
+
+public class CountSupplier<K, V> implements AggregatorSupplier<K, V, Long> {
+
+    private class Count implements Aggregator<K, V, Long> {
+
+        @Override
+        public Long initialValue() {
+            return 0L;
+        }
+
+        @Override
+        public Long add(K aggKey, V value, Long aggregate) {
+            return aggregate + 1;
+        }
+
+        @Override
+        public Long remove(K aggKey, V value, Long aggregate) {
+            return aggregate - 1;
+        }
+
+        @Override
+        public Long merge(Long aggr1, Long aggr2) {
+            return aggr1 + aggr2;
+        }
+    }
+
+    @Override
+    public Aggregator<K, V, Long> get() {
+        return new Count();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java
deleted file mode 100644
index 9a14c53..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.kstream.Windowed;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-public class DefaultWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
-
-    private static final int TIMESTAMP_SIZE = 8;
-
-    private Deserializer<T> inner;
-
-    public DefaultWindowedDeserializer(Deserializer<T> inner) {
-        this.inner = inner;
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // do nothing
-    }
-
-    @Override
-    public Windowed<T> deserialize(String topic, byte[] data) {
-
-        byte[] bytes = new byte[data.length - TIMESTAMP_SIZE];
-
-        System.arraycopy(data, 0, bytes, 0, bytes.length);
-
-        long start = ByteBuffer.wrap(data).getLong(data.length - TIMESTAMP_SIZE);
-
-        // always read as unlimited window
-        return new Windowed<T>(inner.deserialize(topic, bytes), new UnlimitedWindow(start));
-    }
-
-
-    @Override
-    public void close() {
-        inner.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java
deleted file mode 100644
index 4bf2b28..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.Windowed;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-public class DefaultWindowedSerializer<T> implements Serializer<Windowed<T>> {
-
-    private static final int TIMESTAMP_SIZE = 8;
-
-    private Serializer<T> inner;
-
-    public DefaultWindowedSerializer(Serializer<T> inner) {
-        this.inner = inner;
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // do nothing
-    }
-
-    @Override
-    public byte[] serialize(String topic, Windowed<T> data) {
-        byte[] serializedKey = inner.serialize(topic, data.value());
-
-        ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE);
-        buf.put(serializedKey);
-        buf.putLong(data.window().start());
-
-        return buf.array();
-    }
-
-
-    @Override
-    public void close() {
-        inner.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 4505e74..f53c0d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -250,16 +250,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             KStream<K, V1> other,
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
-            Serializer<K> keySerialzier,
-            Serializer<V> thisValueSerialzier,
-            Serializer<V1> otherValueSerialzier,
-            Deserializer<K> keyDeserialier,
-            Deserializer<V> thisValueDeserialzier,
-            Deserializer<V1> otherValueDeserialzier) {
+            Serializer<K> keySerializer,
+            Serializer<V> thisValueSerializer,
+            Serializer<V1> otherValueSerializer,
+            Deserializer<K> keyDeserializer,
+            Deserializer<V> thisValueDeserializer,
+            Deserializer<V1> otherValueDeserializer) {
 
         return join(other, joiner, windows,
-                keySerialzier, thisValueSerialzier, otherValueSerialzier,
-                keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, false);
+                keySerializer, thisValueSerializer, otherValueSerializer,
+                keyDeserializer, thisValueDeserializer, otherValueDeserializer, false);
     }
 
     @Override
@@ -267,16 +267,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             KStream<K, V1> other,
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
-            Serializer<K> keySerialzier,
-            Serializer<V> thisValueSerialzier,
-            Serializer<V1> otherValueSerialzier,
-            Deserializer<K> keyDeserialier,
-            Deserializer<V> thisValueDeserialzier,
-            Deserializer<V1> otherValueDeserialzier) {
+            Serializer<K> keySerializer,
+            Serializer<V> thisValueSerializer,
+            Serializer<V1> otherValueSerializer,
+            Deserializer<K> keyDeserializer,
+            Deserializer<V> thisValueDeserializer,
+            Deserializer<V1> otherValueDeserializer) {
 
         return join(other, joiner, windows,
-                keySerialzier, thisValueSerialzier, otherValueSerialzier,
-                keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, true);
+                keySerializer, thisValueSerializer, otherValueSerializer,
+                keyDeserializer, thisValueDeserializer, otherValueDeserializer, true);
     }
 
     @SuppressWarnings("unchecked")
@@ -284,12 +284,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             KStream<K, V1> other,
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
-            Serializer<K> keySerialzier,
-            Serializer<V> thisValueSerialzier,
-            Serializer<V1> otherValueSerialzier,
-            Deserializer<K> keyDeserialier,
-            Deserializer<V> thisValueDeserialzier,
-            Deserializer<V1> otherValueDeserialzier,
+            Serializer<K> keySerializer,
+            Serializer<V> thisValueSerializer,
+            Serializer<V1> otherValueSerializer,
+            Deserializer<K> keyDeserializer,
+            Deserializer<V> thisValueDeserializer,
+            Deserializer<V1> otherValueDeserializer,
             boolean outer) {
 
         Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
@@ -301,7 +301,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                         windows.after,
                         windows.maintainMs(),
                         windows.segments,
-                        new Serdes<>("", keySerialzier, keyDeserialier, thisValueSerialzier, thisValueDeserialzier),
+                        new Serdes<>("", keySerializer, keyDeserializer, thisValueSerializer, thisValueDeserializer),
                         null);
 
         RocksDBWindowStoreSupplier<K, V1> otherWindow =
@@ -311,7 +311,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                         windows.after,
                         windows.maintainMs(),
                         windows.segments,
-                        new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
+                        new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
                         null);
 
         KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name());
@@ -344,10 +344,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             KStream<K, V1> other,
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
-            Serializer<K> keySerialzier,
-            Serializer<V1> otherValueSerialzier,
-            Deserializer<K> keyDeserialier,
-            Deserializer<V1> otherValueDeserialzier) {
+            Serializer<K> keySerializer,
+            Serializer<V1> otherValueSerializer,
+            Deserializer<K> keyDeserializer,
+            Deserializer<V1> otherValueDeserializer) {
 
         Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
@@ -358,7 +358,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                         windows.after,
                         windows.maintainMs(),
                         windows.segments,
-                        new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
+                        new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
                         null);
 
         KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name());

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
new file mode 100644
index 0000000..a5948f8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {
+
+    private final String storeName;
+    private final Aggregator<K, V, T> aggregator;
+
+    private boolean sendOldValues = false;
+
+    KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) {
+        this.storeName = storeName;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    @Override
+    public Processor<K, Change<V>> get() {
+        return new KTableAggregateProcessor();
+    }
+
+    private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
+
+        private KeyValueStore<K, T> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+
+            store = (KeyValueStore<K, T>) context.getStateStore(storeName);
+        }
+
+        @Override
+        public void process(K key, Change<V> value) {
+            T oldAgg = store.get(key);
+
+            if (oldAgg == null)
+                oldAgg = aggregator.initialValue();
+
+            T newAgg = oldAgg;
+
+            // first try to remove the old value
+            if (value.oldValue != null) {
+                newAgg = aggregator.remove(key, value.oldValue, newAgg);
+            }
+
+            // then try to add the new new value
+            if (value.newValue != null) {
+                newAgg = aggregator.add(key, value.newValue, newAgg);
+            }
+
+            // update the store with the new value
+            store.put(key, newAgg);
+
+            // send the old / new pair
+            if (sendOldValues)
+                context().forward(key, new Change<>(newAgg, oldAgg));
+            else
+                context().forward(key, new Change<>(newAgg, null));
+        }
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, T> view() {
+
+        return new KTableValueGetterSupplier<K, T>() {
+
+            public KTableValueGetter<K, T> get() {
+                return new KTableAggregateValueGetter();
+            }
+
+        };
+    }
+
+    private class KTableAggregateValueGetter implements KTableValueGetter<K, T> {
+
+        private KeyValueStore<K, T> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            store = (KeyValueStore<K, T>) context.getStateStore(storeName);
+        }
+
+        @Override
+        public T get(K key) {
+            return store.get(key);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 32d3cc5..7f30f59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.AggregatorSupplier;
 import org.apache.kafka.streams.kstream.KStream;
@@ -25,16 +27,15 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper;
-import org.apache.kafka.streams.kstream.KeyValueToIntMapper;
 import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
 
-import java.util.Collection;
+import java.util.Collections;
 import java.util.Set;
 
 /**
@@ -51,6 +52,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
+    private static final String SELECT_NAME = "KTABLE-SELECT-";
+
+    private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
+
     public static final String SOURCE_NAME = "KTABLE-SOURCE-";
 
     public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
@@ -169,47 +174,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @SuppressWarnings("unchecked")
-    KTableValueGetterSupplier<K, V> valueGetterSupplier() {
-        if (processorSupplier instanceof KTableSource) {
-            KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
-            materialize(source);
-            return new KTableSourceValueGetterSupplier<>(source.topic);
-        } else {
-            return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    void enableSendingOldValues() {
-        if (!sendOldValues) {
-            if (processorSupplier instanceof KTableSource) {
-                KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
-                materialize(source);
-                source.enableSendingOldValues();
-            } else {
-                ((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues();
-            }
-            sendOldValues = true;
-        }
-    }
-
-    boolean sendingOldValueEnabled() {
-        return sendOldValues;
-    }
-
-    private void materialize(KTableSource<K, ?> source) {
-        synchronized (source) {
-            if (!source.isMaterialized()) {
-                StateStoreSupplier storeSupplier =
-                        new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
-                // mark this state as non internal hence it is read directly from a user topic
-                topology.addStateStore(storeSupplier, false, name);
-                source.materialize();
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
     @Override
     public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
         Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
@@ -281,63 +245,142 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     @Override
     public <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
                                                  KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                                 Serializer<K> keySerializer,
+                                                 Serializer<K1> keySerializer,
+                                                 Serializer<V1> valueSerializer,
                                                  Serializer<V2> aggValueSerializer,
-                                                 Deserializer<K> keyDeserializer,
+                                                 Deserializer<K1> keyDeserializer,
+                                                 Deserializer<V1> valueDeserializer,
                                                  Deserializer<V2> aggValueDeserializer,
                                                  String name) {
-        // TODO
-        return null;
+
+        String selectName = topology.newName(SELECT_NAME);
+        String sinkName = topology.newName(KStreamImpl.SINK_NAME);
+        String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
+        String aggregateName = topology.newName(AGGREGATE_NAME);
+
+        String topic = name + "-repartition";
+
+        ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
+        ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
+
+        KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
+
+        ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregatorSupplier.get());
+
+        StateStoreSupplier aggregateStore = Stores.create(name)
+                .withKeys(keySerializer, keyDeserializer)
+                .withValues(aggValueSerializer, aggValueDeserializer)
+                .localDatabase()
+                .build();
+
+        // select the aggregate key and values (old and new), it would require parent to send old values
+        topology.addProcessor(selectName, selectSupplier, this.name);
+        this.enableSendingOldValues();
+
+        // send the aggregate key-value pairs to the intermediate topic for partitioning
+        topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName);
+
+        // read the intermediate topic
+        topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
+
+        // aggregate the values with the aggregator and local store
+        topology.addProcessor(aggregateName, aggregateSupplier, sourceName);
+        topology.addStateStore(aggregateStore, aggregateName);
+
+        // return the KTable representation with the intermediate topic as the sources
+        return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName));
     }
 
     @Override
-    public <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
-                                     KeyValueToLongMapper<K, V> valueSelector,
-                                     Serializer<K> keySerializer,
-                                     Deserializer<K> keyDeserializer,
+    public <K1> KTable<K1, Long> sum(final KeyValueMapper<K, V, K1> keySelector,
+                                     final KeyValueToLongMapper<K, V> valueSelector,
+                                     Serializer<K1> keySerializer,
+                                     Deserializer<K1> keyDeserializer,
                                      String name) {
-        // TODO
-        return null;
-    }
 
-    @Override
-    public <K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector,
-                                        KeyValueToIntMapper<K, V> valueSelector,
-                                        Serializer<K> keySerializer,
-                                        Deserializer<K> keyDeserializer,
-                                        String name) {
-        // TODO
-        return null;
+        Serializer<Long> longSerializer = new LongSerializer();
+        Deserializer<Long> longDeserializer = new LongDeserializer();
+
+        KeyValueMapper<K, V, KeyValue<K1, Long>> mapper = new KeyValueMapper<K, V, KeyValue<K1, Long>>() {
+            @Override
+            public KeyValue<K1, Long> apply(K key, V value) {
+                K1 aggKey = keySelector.apply(key, value);
+                Long aggValue = valueSelector.apply(key, value);
+
+                return new KeyValue<>(aggKey, aggValue);
+            }
+        };
+
+        return this.<K1, Long, Long>aggregate(new LongSumSupplier<K1>(), mapper,
+                keySerializer, longSerializer, longSerializer,
+                keyDeserializer, longDeserializer, longDeserializer,
+                name);
     }
 
     @Override
-    public <K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector,
-                                       KeyValueToDoubleMapper<K, V> valueSelector,
-                                       Serializer<K> keySerializer,
-                                       Deserializer<K> keyDeserializer,
+    public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> keySelector,
+                                       Serializer<K1> keySerializer,
+                                       Serializer<V> valueSerializer,
+                                       Deserializer<K1> keyDeserializer,
+                                       Deserializer<V> valueDeserializer,
                                        String name) {
-        // TODO
-        return null;
+
+        Serializer<Long> longSerializer = new LongSerializer();
+        Deserializer<Long> longDeserializer = new LongDeserializer();
+
+        KeyValueMapper<K, V, KeyValue<K1, V>> mapper = new KeyValueMapper<K, V, KeyValue<K1, V>>() {
+            @Override
+            public KeyValue<K1, V> apply(K key, V value) {
+                K1 aggKey = keySelector.apply(key, value);
+
+                return new KeyValue<>(aggKey, value);
+            }
+        };
+
+        return this.<K1, V, Long>aggregate(new CountSupplier<K1, V>(), mapper,
+                keySerializer, valueSerializer, longSerializer,
+                keyDeserializer, valueDeserializer, longDeserializer,
+                name);
     }
 
-    @Override
-    public <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
-                                       Serializer<K> keySerializer,
-                                       Deserializer<K> keyDeserializer,
-                                       String name) {
-        // TODO
-        return null;
+    @SuppressWarnings("unchecked")
+    KTableValueGetterSupplier<K, V> valueGetterSupplier() {
+        if (processorSupplier instanceof KTableSource) {
+            KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
+            materialize(source);
+            return new KTableSourceValueGetterSupplier<>(source.topic);
+        } else {
+            return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
+        }
     }
 
-    @Override
-    public <K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k,
-                                                                           KeyValueMapper<K, V, K1> keySelector,
-                                                                           Serializer<K> keySerializer,
-                                                                           Serializer<V1> aggValueSerializer,
-                                                                           Deserializer<K> keyDeserializer,
-                                                                           Deserializer<V1> aggValueDeserializer,
-                                                                           String name) {
-        // TODO
-        return null;
+    @SuppressWarnings("unchecked")
+    void enableSendingOldValues() {
+        if (!sendOldValues) {
+            if (processorSupplier instanceof KTableSource) {
+                KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                materialize(source);
+                source.enableSendingOldValues();
+            } else {
+                ((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues();
+            }
+            sendOldValues = true;
+        }
+    }
+
+    boolean sendingOldValueEnabled() {
+        return sendOldValues;
+    }
+
+    private void materialize(KTableSource<K, ?> source) {
+        synchronized (source) {
+            if (!source.isMaterialized()) {
+                StateStoreSupplier storeSupplier =
+                        new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
+                // mark this state as non internal hence it is read directly from a user topic
+                topology.addStateStore(storeSupplier, false, name);
+                source.materialize();
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index be80855..c664906 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
+
 class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> {
 
     private final KTableImpl<K1, ?, V1> parent;
@@ -36,7 +37,7 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
 
     @Override
     public Processor<K1, Change<V1>> get() {
-        return new KTableMapProcessor();
+        return new KTableMapValuesProcessor();
     }
 
     @Override
@@ -67,16 +68,15 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
         return newValue;
     }
 
-    private class KTableMapProcessor extends AbstractProcessor<K1, Change<V1>> {
+    private class KTableMapValuesProcessor extends AbstractProcessor<K1, Change<V1>> {
 
         @Override
         public void process(K1 key, Change<V1> change) {
             V2 newValue = computeValue(change.newValue);
             V2 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
 
-            context().forward(key, new Change(newValue, oldValue));
+            context().forward(key, new Change<>(newValue, oldValue));
         }
-
     }
 
     private class KTableMapValuesValueGetter implements KTableValueGetter<K1, V2> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
new file mode 100644
index 0000000..bbef7fb
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+/**
+ * KTable repartition map functions are not exposed to public APIs, but only used for keyed aggregations.
+ *
+ * Given the input, it can output at most two records (one mapped from old value and one mapped from new value).
+ */
+public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupplier<K1, V1, KeyValue<K2, V2>> {
+
+    private final KTableImpl<K1, ?, V1> parent;
+    private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
+
+    public KTableRepartitionMap(KTableImpl<K1, ?, V1> parent, KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) {
+        this.parent = parent;
+        this.mapper = mapper;
+    }
+
+    @Override
+    public Processor<K1, Change<V1>> get() {
+        return new KTableMapProcessor();
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K1, KeyValue<K2, V2>> view() {
+        final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier();
+
+        return new KTableValueGetterSupplier<K1, KeyValue<K2, V2>>() {
+
+            public KTableValueGetter<K1, KeyValue<K2, V2>> get() {
+                return new KTableMapValueGetter(parentValueGetterSupplier.get());
+            }
+
+        };
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        // this should never be called
+        throw new KafkaException("KTableRepartitionMap should always require sending old values.");
+    }
+
+    private KeyValue<K2, V2> computeValue(K1 key, V1 value) {
+        KeyValue<K2, V2> newValue = null;
+
+        if (key != null || value != null)
+            newValue = mapper.apply(key, value);
+
+        return newValue;
+    }
+
+    private class KTableMapProcessor extends AbstractProcessor<K1, Change<V1>> {
+
+        @Override
+        public void process(K1 key, Change<V1> change) {
+            KeyValue<K2, V2> newPair = computeValue(key, change.newValue);
+
+            context().forward(newPair.key, new Change<>(newPair.value, null));
+
+            if (change.oldValue != null) {
+                KeyValue<K2, V2> oldPair = computeValue(key, change.oldValue);
+                context().forward(oldPair.key, new Change<>(null, oldPair.value));
+            }
+        }
+    }
+
+    private class KTableMapValueGetter implements KTableValueGetter<K1, KeyValue<K2, V2>> {
+
+        private final KTableValueGetter<K1, V1> parentGetter;
+
+        public KTableMapValueGetter(KTableValueGetter<K1, V1> parentGetter) {
+            this.parentGetter = parentGetter;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            parentGetter.init(context);
+        }
+
+        @Override
+        public KeyValue<K2, V2> get(K1 key) {
+            return computeValue(key, parentGetter.get(key));
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
new file mode 100644
index 0000000..b66590e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.AggregatorSupplier;
+
+public class LongSumSupplier<K> implements AggregatorSupplier<K, Long, Long> {
+
+    private class LongSum implements Aggregator<K, Long, Long> {
+
+        @Override
+        public Long initialValue() {
+            return 0L;
+        }
+
+        @Override
+        public Long add(K aggKey, Long value, Long aggregate) {
+            return aggregate + value;
+        }
+
+        @Override
+        public Long remove(K aggKey, Long value, Long aggregate) {
+            return aggregate - value;
+        }
+
+        @Override
+        public Long merge(Long aggr1, Long aggr2) {
+            return aggr1 + aggr2;
+        }
+    }
+
+    @Override
+    public Aggregator<K, Long, Long> get() {
+        return new LongSum();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
new file mode 100644
index 0000000..96c3668
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class WindowedDeserializer<T> implements Deserializer<Windowed<T>> {
+
+    private static final int TIMESTAMP_SIZE = 8;
+
+    private Deserializer<T> inner;
+
+    public WindowedDeserializer(Deserializer<T> inner) {
+        this.inner = inner;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // do nothing
+    }
+
+    @Override
+    public Windowed<T> deserialize(String topic, byte[] data) {
+
+        byte[] bytes = new byte[data.length - TIMESTAMP_SIZE];
+
+        System.arraycopy(data, 0, bytes, 0, bytes.length);
+
+        long start = ByteBuffer.wrap(data).getLong(data.length - TIMESTAMP_SIZE);
+
+        // always read as unlimited window
+        return new Windowed<T>(inner.deserialize(topic, bytes), new UnlimitedWindow(start));
+    }
+
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
new file mode 100644
index 0000000..4407a5b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
+
+    private static final int TIMESTAMP_SIZE = 8;
+
+    private Serializer<T> inner;
+
+    public WindowedSerializer(Serializer<T> inner) {
+        this.inner = inner;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // do nothing
+    }
+
+    @Override
+    public byte[] serialize(String topic, Windowed<T> data) {
+        byte[] serializedKey = inner.serialize(topic, data.value());
+
+        ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE);
+        buf.put(serializedKey);
+        buf.putLong(data.window().start());
+
+        return buf.array();
+    }
+
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
new file mode 100644
index 0000000..189cf9d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.AggregatorSupplier;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.NoOpKeyValueMapper;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+
+public class KTableAggregateTest {
+
+    private final Serializer<String> strSerializer = new StringSerializer();
+    private final Deserializer<String> strDeserializer = new StringDeserializer();
+
+    private class StringCanonizeSupplier implements AggregatorSupplier<String, String, String> {
+
+        private class StringCanonizer implements Aggregator<String, String, String> {
+
+            @Override
+            public String initialValue() {
+                return "";
+            }
+
+            @Override
+            public String add(String aggKey, String value, String aggregate) {
+                return aggregate + "+" + value;
+            }
+
+            @Override
+            public String remove(String aggKey, String value, String aggregate) {
+                return aggregate + "-" + value;
+            }
+
+            @Override
+            public String merge(String aggr1, String aggr2) {
+                return "(" + aggr1 + ") + (" + aggr2 + ")";
+            }
+        }
+
+        @Override
+        public Aggregator<String, String, String> get() {
+            return new StringCanonizer();
+        }
+    }
+
+    @Test
+    public void testAggBasic() throws Exception {
+        final File baseDir = Files.createTempDirectory("test").toFile();
+
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+            String topic1 = "topic1";
+
+            KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+            KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringCanonizeSupplier(),
+                    new NoOpKeyValueMapper<String, String>(),
+                    strSerializer,
+                    strSerializer,
+                    strSerializer,
+                    strDeserializer,
+                    strDeserializer,
+                    strDeserializer,
+                    "topic1-Canonized");
+
+            MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
+            table2.toStream().process(proc2);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+
+            driver.process(topic1, "A", "1");
+            driver.process(topic1, "B", "2");
+            driver.process(topic1, "A", "3");
+            driver.process(topic1, "B", "4");
+            driver.process(topic1, "C", "5");
+            driver.process(topic1, "D", "6");
+            driver.process(topic1, "B", "7");
+            driver.process(topic1, "C", "8");
+
+            assertEquals(Utils.mkList(
+                    "A:+1",
+                    "B:+2",
+                    "A:+1+3", "A:+1+3-1",
+                    "B:+2+4", "B:+2+4-2",
+                    "C:+5",
+                    "D:+6",
+                    "B:+2+4-2+7", "B:+2+4-2+7-4",
+                    "C:+5+8", "C:+5+8-5"), proc2.processed);
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
deleted file mode 100644
index 037b30a..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class KTableMapValuesImplTest {
-
-    private final Serializer<String> strSerializer = new StringSerializer();
-    private final Deserializer<String> strDeserializer = new StringDeserializer();
-
-    @Test
-    public void testKTable() {
-        final KStreamBuilder builder = new KStreamBuilder();
-
-        String topic1 = "topic1";
-
-        KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
-        KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
-            @Override
-            public Integer apply(String value) {
-                return new Integer(value);
-            }
-        });
-
-        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "02");
-        driver.process(topic1, "C", "03");
-        driver.process(topic1, "D", "04");
-
-        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
-    }
-
-    @Test
-    public void testValueGetter() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final Serializer<String> serializer = new StringSerializer();
-            final Deserializer<String> deserializer = new StringDeserializer();
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
-            String topic2 = "topic2";
-
-            KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
-            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                    new ValueMapper<String, Integer>() {
-                        @Override
-                        public Integer apply(String value) {
-                            return new Integer(value);
-                        }
-                    });
-            KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
-                    new Predicate<String, Integer>() {
-                        @Override
-                        public boolean test(String key, Integer value) {
-                            return (value % 2) == 0;
-                        }
-                    });
-            KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
-                    table1.through(topic2, serializer, serializer, deserializer, deserializer);
-
-            KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
-            KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
-            KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
-            KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
-
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
-
-            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
-            getter1.init(driver.context());
-            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
-            getter2.init(driver.context());
-            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
-            getter3.init(driver.context());
-            KTableValueGetter<String, String> getter4 = getterSupplier4.get();
-            getter4.init(driver.context());
-
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
-
-            assertEquals("01", getter1.get("A"));
-            assertEquals("01", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(1), getter2.get("A"));
-            assertEquals(new Integer(1), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertNull(getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("01", getter4.get("A"));
-            assertEquals("01", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
-
-            assertEquals("02", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(2), getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertEquals(new Integer(2), getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("02", getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.process(topic1, "A", "03");
-
-            assertEquals("03", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(3), getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("03", getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.process(topic1, "A", null);
-
-            assertNull(getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertNull(getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertNull(getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-        } finally {
-            Utils.delete(stateDir);
-        }
-    }
-
-    @Test
-    public void testNotSendingOldValue() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final Serializer<String> serializer = new StringSerializer();
-            final Deserializer<String> deserializer = new StringDeserializer();
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
-
-            KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
-            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                    new ValueMapper<String, Integer>() {
-                        @Override
-                        public Integer apply(String value) {
-                            return new Integer(value);
-                        }
-                    });
-
-            MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
-
-            builder.addProcessor("proc", proc, table2.name);
-
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
-
-            assertFalse(table1.sendingOldValueEnabled());
-            assertFalse(table2.sendingOldValueEnabled());
-
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
-
-            proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
-
-            proc.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
-
-            driver.process(topic1, "A", "03");
-
-            proc.checkAndClearResult("A:(3<-null)");
-
-            driver.process(topic1, "A", null);
-
-            proc.checkAndClearResult("A:(null<-null)");
-
-        } finally {
-            Utils.delete(stateDir);
-        }
-    }
-
-    @Test
-    public void testSendingOldValue() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final Serializer<String> serializer = new StringSerializer();
-            final Deserializer<String> deserializer = new StringDeserializer();
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
-
-            KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
-            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                    new ValueMapper<String, Integer>() {
-                        @Override
-                        public Integer apply(String value) {
-                            return new Integer(value);
-                        }
-                    });
-
-            table2.enableSendingOldValues();
-
-            MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
-
-            builder.addProcessor("proc", proc, table2.name);
-
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
-
-            assertTrue(table1.sendingOldValueEnabled());
-            assertTrue(table2.sendingOldValueEnabled());
-
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
-
-            proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
-
-            proc.checkAndClearResult("A:(2<-1)", "B:(2<-1)");
-
-            driver.process(topic1, "A", "03");
-
-            proc.checkAndClearResult("A:(3<-2)");
-
-            driver.process(topic1, "A", null);
-
-            proc.checkAndClearResult("A:(null<-3)");
-
-        } finally {
-            Utils.delete(stateDir);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f22705c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
new file mode 100644
index 0000000..58f1c2a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class KTableMapValuesTest {
+
+    private final Serializer<String> strSerializer = new StringSerializer();
+    private final Deserializer<String> strDeserializer = new StringDeserializer();
+
+    @Test
+    public void testKTable() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+        KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
+            @Override
+            public Integer apply(String value) {
+                return new Integer(value);
+            }
+        });
+
+        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "02");
+        driver.process(topic1, "C", "03");
+        driver.process(topic1, "D", "04");
+
+        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
+    }
+
+    @Test
+    public void testValueGetter() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+            final Deserializer<String> deserializer = new StringDeserializer();
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+            String topic2 = "topic2";
+
+            KTableImpl<String, String, String> table1 =
+                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                    new ValueMapper<String, Integer>() {
+                        @Override
+                        public Integer apply(String value) {
+                            return new Integer(value);
+                        }
+                    });
+            KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+                    new Predicate<String, Integer>() {
+                        @Override
+                        public boolean test(String key, Integer value) {
+                            return (value % 2) == 0;
+                        }
+                    });
+            KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+                    table1.through(topic2, serializer, serializer, deserializer, deserializer);
+
+            KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+            KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+            KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+            KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+            getter1.init(driver.context());
+            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+            getter2.init(driver.context());
+            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+            getter3.init(driver.context());
+            KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+            getter4.init(driver.context());
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            assertEquals("01", getter1.get("A"));
+            assertEquals("01", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(1), getter2.get("A"));
+            assertEquals(new Integer(1), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("01", getter4.get("A"));
+            assertEquals("01", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            assertEquals("02", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(2), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertEquals(new Integer(2), getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("02", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", "03");
+
+            assertEquals("03", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(3), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("03", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", null);
+
+            assertNull(getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertNull(getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertNull(getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+    @Test
+    public void testNotSendingOldValue() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+            final Deserializer<String> deserializer = new StringDeserializer();
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, String, String> table1 =
+                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                    new ValueMapper<String, Integer>() {
+                        @Override
+                        public Integer apply(String value) {
+                            return new Integer(value);
+                        }
+                    });
+
+            MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+
+            builder.addProcessor("proc", proc, table2.name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            assertFalse(table1.sendingOldValueEnabled());
+            assertFalse(table2.sendingOldValueEnabled());
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            proc.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+
+            driver.process(topic1, "A", "03");
+
+            proc.checkAndClearResult("A:(3<-null)");
+
+            driver.process(topic1, "A", null);
+
+            proc.checkAndClearResult("A:(null<-null)");
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+    @Test
+    public void testSendingOldValue() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+            final Deserializer<String> deserializer = new StringDeserializer();
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, String, String> table1 =
+                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                    new ValueMapper<String, Integer>() {
+                        @Override
+                        public Integer apply(String value) {
+                            return new Integer(value);
+                        }
+                    });
+
+            table2.enableSendingOldValues();
+
+            MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+
+            builder.addProcessor("proc", proc, table2.name);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            assertTrue(table1.sendingOldValueEnabled());
+            assertTrue(table2.sendingOldValueEnabled());
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            proc.checkAndClearResult("A:(2<-1)", "B:(2<-1)");
+
+            driver.process(topic1, "A", "03");
+
+            proc.checkAndClearResult("A:(3<-2)");
+
+            driver.process(topic1, "A", null);
+
+            proc.checkAndClearResult("A:(null<-3)");
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+}