You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/04 23:59:31 UTC

[2/2] kafka git commit: KAFKA-2856: Add KTable non-stateful APIs along with standby task support

KAFKA-2856: Add KTable non-stateful APIs along with standby task support

guozhangwang
* added KTable API and impl
* added standby support for KTable

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #604 from ymatsuda/add_ktable


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39c3512e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39c3512e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39c3512e

Branch: refs/heads/trunk
Commit: 39c3512eceedebcb6e50f8c6c4ef66601ff7dbc4
Parents: cd54fc8
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Fri Dec 4 14:59:24 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Dec 4 14:59:24 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   |  22 +-
 .../kafka/streams/kstream/KStreamBuilder.java   |  41 +++
 .../apache/kafka/streams/kstream/KTable.java    | 108 ++++++++
 .../streams/kstream/internals/KStreamImpl.java  |   2 +-
 .../KTableDerivedValueGetterSupplier.java       |  28 ++
 .../streams/kstream/internals/KTableFilter.java |  87 ++++++
 .../streams/kstream/internals/KTableImpl.java   | 188 +++++++++++++
 .../kstream/internals/KTableMapValues.java      |  85 ++++++
 .../internals/KTableProcessorSupplier.java      |  26 ++
 .../streams/kstream/internals/KTableSource.java |  78 ++++++
 .../KTableSourceValueGetterSupplier.java        |  50 ++++
 .../kstream/internals/KTableStoreSupplier.java  |  58 ++++
 .../kstream/internals/KTableValueGetter.java    |  28 ++
 .../internals/KTableValueGetterSupplier.java    |  24 ++
 .../streams/processor/PartitionGrouper.java     |   2 +-
 .../apache/kafka/streams/processor/TaskId.java  |  12 +
 .../processor/internals/AbstractTask.java       |  14 +-
 .../KafkaStreamingPartitionAssignor.java        |  20 +-
 .../internals/ProcessorStateManager.java        |  49 +++-
 .../processor/internals/StandbyTask.java        |  25 +-
 .../streams/processor/internals/StreamTask.java |  18 +-
 .../processor/internals/StreamThread.java       |  78 ++++--
 .../internals/assignment/AssignmentInfo.java    | 129 +++++----
 .../state/KeyValueStoreChangeLogger.java        |  87 ++++++
 .../streams/state/MeteredKeyValueStore.java     |  95 +++----
 .../state/RocksDBKeyValueStoreSupplier.java     | 252 +-----------------
 .../kafka/streams/state/RocksDBStore.java       | 265 +++++++++++++++++++
 .../org/apache/kafka/streams/state/Serdes.java  |   3 +-
 .../kstream/internals/KTableFilterTest.java     | 137 ++++++++++
 .../kstream/internals/KTableImplTest.java       | 220 +++++++++++++++
 .../internals/KTableMapValuesImplTest.java      | 198 ++++++++++++++
 .../kstream/internals/KTableSourceTest.java     | 117 ++++++++
 .../KafkaStreamingPartitionAssignorTest.java    | 151 +++++------
 .../processor/internals/StandbyTaskTest.java    |  11 +-
 .../processor/internals/StreamTaskTest.java     |   4 +-
 .../processor/internals/StreamThreadTest.java   |  12 +-
 .../assignment/AssginmentInfoTest.java          |  11 +-
 .../apache/kafka/test/KStreamTestDriver.java    |  43 ++-
 .../apache/kafka/test/MockProcessorContext.java |  14 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   4 +-
 40 files changed, 2268 insertions(+), 528 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 992bd75..93303eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -33,7 +33,7 @@ public interface KStream<K, V> {
      * Creates a new instance of KStream consists of all elements of this stream which satisfy a predicate
      *
      * @param predicate the instance of Predicate
-     * @return the stream with only those elements that satisfy the predicate
+     * @return the instance of KStream with only those elements that satisfy the predicate
      */
     KStream<K, V> filter(Predicate<K, V> predicate);
 
@@ -41,22 +41,22 @@ public interface KStream<K, V> {
      * Creates a new instance of KStream consists all elements of this stream which do not satisfy a predicate
      *
      * @param predicate the instance of Predicate
-     * @return the stream with only those elements that do not satisfy the predicate
+     * @return the instance of KStream with only those elements that do not satisfy the predicate
      */
     KStream<K, V> filterOut(Predicate<K, V> predicate);
 
     /**
-     * Creates a new stream by applying transforming each element in this stream into a different element in the new stream.
+     * Creates a new instance of KStream by applying transforming each element in this stream into a different element in the new stream.
      *
      * @param mapper the instance of KeyValueMapper
      * @param <K1>   the key type of the new stream
      * @param <V1>   the value type of the new stream
-     * @return the mapped stream
+     * @return the instance of KStream
      */
     <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper);
 
     /**
-     * Creates a new instance of KStream by applying transforming each value in this stream into a different value in the new stream.
+     * Creates a new instance of KStream by transforming each value in this stream into a different value in the new stream.
      *
      * @param mapper the instance of ValueMapper
      * @param <V1>   the value type of the new stream
@@ -65,7 +65,7 @@ public interface KStream<K, V> {
     <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
     /**
-     * Creates a new instance of KStream by applying transforming each element in this stream into zero or more elements in the new stream.
+     * Creates a new instance of KStream by transforming each element in this stream into zero or more elements in the new stream.
      *
      * @param mapper the instance of KeyValueMapper
      * @param <K1>   the key type of the new stream
@@ -75,7 +75,7 @@ public interface KStream<K, V> {
     <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper);
 
     /**
-     * Creates a new instance of KStream by applying transforming each value in this stream into zero or more values in the new stream.
+     * Creates a new stream by transforming each value in this stream into zero or more values in the new stream.
      *
      * @param processor the instance of Processor
      * @param <V1>      the value type of the new stream
@@ -103,11 +103,11 @@ public interface KStream<K, V> {
     KStream<K, V>[] branch(Predicate<K, V>... predicates);
 
     /**
-     * Sends key-value to a topic, also creates a new stream from the topic.
+     * Sends key-value to a topic, also creates a new instance of KStream from the topic.
      * This is equivalent to calling to(topic) and from(topic).
      *
      * @param topic           the topic name
-     * @return the new stream that consumes the given topic
+     * @return the instance of KStream that consumes the given topic
      */
     KStream<K, V> through(String topic);
 
@@ -124,7 +124,7 @@ public interface KStream<K, V> {
      *                        if not specified the default key deserializer defined in the configuration will be used
      * @param valDeserializer value deserializer used to create the new KStream,
      *                        if not specified the default value deserializer defined in the configuration will be used
-     * @return the new stream that consumes the given topic
+     * @return the instance of KStream that consumes the given topic
      */
     KStream<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer);
 
@@ -160,7 +160,7 @@ public interface KStream<K, V> {
      *
      * @param valueTransformerSupplier the class of TransformerDef
      * @param stateStoreNames the names of the state store used by the processor
-     * @return the instance of KStream that contains transformed keys and values
+     * @return the instance of KStream that contains the keys and transformed values
      */
     <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index ae8f694..ca1a10d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -18,7 +18,11 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
+import org.apache.kafka.streams.kstream.internals.KTableImpl;
+import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.KTableSource;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 
 import java.util.Collections;
@@ -65,6 +69,43 @@ public class KStreamBuilder extends TopologyBuilder {
     }
 
     /**
+     * Creates a KTable instance for the specified topic.
+     * The default deserializers specified in the config are used.
+     *
+     * @param topic          the topic name
+     * @return KTable
+     */
+    public <K, V> KTable<K, V> table(String topic) {
+        return table(null, null, null, null, topic);
+    }
+
+    /**
+     * Creates a KTable instance for the specified topic.
+     *
+     * @param keySerializer   key serializer used to send key-value pairs,
+     *                        if not specified the default key serializer defined in the configuration will be used
+     * @param valSerializer   value serializer used to send key-value pairs,
+     *                        if not specified the default value serializer defined in the configuration will be used
+     * @param keyDeserializer key deserializer used to read this source KStream,
+     *                        if not specified the default deserializer defined in the configs will be used
+     * @param valDeserializer value deserializer used to read this source KStream,
+     *                        if not specified the default deserializer defined in the configs will be used
+     * @param topic          the topic name
+     * @return KStream
+     */
+    public <K, V> KTable<K, V> table(Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String topic) {
+        String source = newName(KStreamImpl.SOURCE_NAME);
+        String name = newName(KTableImpl.SOURCE_NAME);
+
+        addSource(source, keyDeserializer, valDeserializer, topic);
+
+        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableSource<>(topic);
+        addProcessor(name, processorSupplier, source);
+
+        return new KTableImpl<>(this, name, processorSupplier, source, topic, keySerializer, valSerializer, keyDeserializer, valDeserializer);
+    }
+
+    /**
      * Creates a new stream by merging the given streams
      *
      * @param streams the streams to be merged

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
new file mode 100644
index 0000000..75fb87a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * KTable is an abstraction of a change log stream.
+ *
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ */
+public interface KTable<K, V> {
+
+    /**
+     * Creates a new instance of KTable consists of all elements of this stream which satisfy a predicate
+     *
+     * @param predicate the instance of Predicate
+     * @return the instance of KTable with only those elements that satisfy the predicate
+     */
+    KTable<K, V> filter(Predicate<K, V> predicate);
+
+    /**
+     * Creates a new instance of KTable consists all elements of this stream which do not satisfy a predicate
+     *
+     * @param predicate the instance of Predicate
+     * @return the instance of KTable with only those elements that do not satisfy the predicate
+     */
+    KTable<K, V> filterOut(Predicate<K, V> predicate);
+
+    /**
+     * Creates a new instance of KTable by transforming each value in this stream into a different value in the new stream.
+     *
+     * @param mapper the instance of ValueMapper
+     * @param <V1>   the value type of the new stream
+     * @return the instance of KTable
+     */
+    <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper);
+
+    /**
+     * Sends key-value to a topic, also creates a new instance of KTable from the topic.
+     * This is equivalent to calling to(topic) and table(topic).
+     *
+     * @param topic           the topic name
+     * @return the instance of KTable that consumes the given topic
+     */
+    KTable<K, V> through(String topic);
+
+    /**
+     * Sends key-value to a topic, also creates a new instance of KTable from the topic.
+     * This is equivalent to calling to(topic) and table(topic).
+     *
+     * @param topic           the topic name
+     * @param keySerializer   key serializer used to send key-value pairs,
+     *                        if not specified the default key serializer defined in the configuration will be used
+     * @param valSerializer   value serializer used to send key-value pairs,
+     *                        if not specified the default value serializer defined in the configuration will be used
+     * @param keyDeserializer key deserializer used to create the new KStream,
+     *                        if not specified the default key deserializer defined in the configuration will be used
+     * @param valDeserializer value deserializer used to create the new KStream,
+     *                        if not specified the default value deserializer defined in the configuration will be used
+     * @return the new stream that consumes the given topic
+     */
+    KTable<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer);
+
+    /**
+     * Sends key-value to a topic using default serializers specified in the config.
+     *
+     * @param topic         the topic name
+     */
+    void to(String topic);
+
+    /**
+     * Sends key-value to a topic.
+     *
+     * @param topic         the topic name
+     * @param keySerializer key serializer used to send key-value pairs,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param valSerializer value serializer used to send key-value pairs,
+     *                      if not specified the default serializer defined in the configs will be used
+     */
+    void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer);
+
+    /**
+     * Creates a new instance of KStream from this KTable
+     *
+     * @return the instance of KStream
+     */
+    KStream<K, V> toStream();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 04aa8e9..fc8f4c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -59,7 +59,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
 
-    private static final String SINK_NAME = "KSTREAM-SINK-";
+    public static final String SINK_NAME = "KSTREAM-SINK-";
 
     public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
new file mode 100644
index 0000000..731d7f7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+public abstract class KTableDerivedValueGetterSupplier<K, V1, V2> implements KTableValueGetterSupplier<K, V2> {
+
+    protected final KTableValueGetterSupplier<K, V1> parentValueGetterSupplier;
+
+    public KTableDerivedValueGetterSupplier(KTableValueGetterSupplier<K, V1> parentValueGetterSupplier) {
+        this.parentValueGetterSupplier = parentValueGetterSupplier;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
new file mode 100644
index 0000000..212b1c9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableFilter<K, V> extends KTableProcessorSupplier<K, V, V> {
+
+    private final Predicate<K, V> predicate;
+    private final boolean filterOut;
+
+    public KTableFilter(Predicate<K, V> predicate, boolean filterOut) {
+        this.predicate = predicate;
+        this.filterOut = filterOut;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KTableFilterProcessor();
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V> view(KTableValueGetterSupplier<K, V> parentValueGetterSupplier) {
+        return new KTableDerivedValueGetterSupplier<K, V, V>(parentValueGetterSupplier) {
+
+            public KTableValueGetter<K, V> get() {
+                return new KTableFilterValueGetter(parentValueGetterSupplier.get());
+            }
+
+        };
+    }
+
+    private V computeNewValue(K key, V value) {
+        V newValue = null;
+
+        if (value != null && (filterOut ^ predicate.test(key, value)))
+            newValue = value;
+
+        return newValue;
+    }
+
+    private class KTableFilterProcessor extends AbstractProcessor<K, V> {
+
+        @Override
+        public void process(K key, V value) {
+            context().forward(key, computeNewValue(key, value));
+        }
+
+    }
+
+    private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
+
+        private final KTableValueGetter<K, V> parentGetter;
+
+        public KTableFilterValueGetter(KTableValueGetter<K, V> parentGetter) {
+            this.parentGetter = parentGetter;
+        }
+
+        public void init(ProcessorContext context) {
+            parentGetter.init(context);
+        }
+
+        public V get(K key) {
+            return computeNewValue(key, parentGetter.get(key));
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
new file mode 100644
index 0000000..5b2b031
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Collections;
+
+/**
+ * The implementation class of KTable
+ * @param <K> the key type
+ * @param <S> the source's (parent's) value type
+ * @param <V> the value type
+ */
+public class KTableImpl<K, S, V> implements KTable<K, V> {
+
+    private static final String FILTER_NAME = "KTABLE-FILTER-";
+
+    private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
+
+    private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
+
+    public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+
+    protected final KStreamBuilder topology;
+    public final String name;
+    public final KTableProcessorSupplier<K, S, V> processorSupplier;
+    private final String sourceNode;
+
+    private final KTableImpl<K, ?, S> parent;
+    private final String topic;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valSerializer;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valDeserializer;
+
+    public KTableImpl(KStreamBuilder topology,
+                      String name,
+                      KTableProcessorSupplier<K, S, V> processorSupplier,
+                      String sourceNode,
+                      KTableImpl<K, ?, S> parent) {
+        this(topology, name, processorSupplier, sourceNode, null, null, null, null, null, parent);
+    }
+
+    public KTableImpl(KStreamBuilder topology,
+                      String name,
+                      KTableProcessorSupplier<K, S, V> processorSupplier,
+                      String sourceNode,
+                      String topic,
+                      Serializer<K> keySerializer,
+                      Serializer<V> valSerializer,
+                      Deserializer<K> keyDeserializer,
+                      Deserializer<V> valDeserializer) {
+        this(topology, name, processorSupplier, sourceNode, topic, keySerializer, valSerializer, keyDeserializer, valDeserializer, null);
+    }
+
+    private KTableImpl(KStreamBuilder topology,
+                       String name,
+                       KTableProcessorSupplier<K, S, V> processorSupplier,
+                       String sourceNode,
+                       String topic,
+                       Serializer<K> keySerializer,
+                       Serializer<V> valSerializer,
+                       Deserializer<K> keyDeserializer,
+                       Deserializer<V> valDeserializer,
+                       KTableImpl<K, ?, S> parent) {
+        this.topology = topology;
+        this.name = name;
+        this.processorSupplier = processorSupplier;
+        this.sourceNode = sourceNode;
+        this.topic = topic;
+        this.keySerializer = keySerializer;
+        this.valSerializer = valSerializer;
+        this.keyDeserializer = keyDeserializer;
+        this.valDeserializer = valDeserializer;
+        this.parent = parent;
+    }
+
+    @Override
+    public KTable<K, V> filter(Predicate<K, V> predicate) {
+        String name = topology.newName(FILTER_NAME);
+        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(predicate, false);
+        topology.addProcessor(name, processorSupplier, this.name);
+
+        return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+    }
+
+    @Override
+    public KTable<K, V> filterOut(final Predicate<K, V> predicate) {
+        String name = topology.newName(FILTER_NAME);
+        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(predicate, true);
+
+        topology.addProcessor(name, processorSupplier, this.name);
+
+        return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+    }
+
+    @Override
+    public <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper) {
+        String name = topology.newName(MAPVALUES_NAME);
+        KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(mapper);
+
+        topology.addProcessor(name, processorSupplier, this.name);
+
+        return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+    }
+
+    @Override
+    public KTable<K, V> through(String topic,
+                                Serializer<K> keySerializer,
+                                Serializer<V> valSerializer,
+                                Deserializer<K> keyDeserializer,
+                                Deserializer<V> valDeserializer) {
+        String sendName = topology.newName(KStreamImpl.SINK_NAME);
+
+        topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
+
+        return topology.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic);
+    }
+
+    @Override
+    public KTable<K, V> through(String topic) {
+        return through(topic, null, null, null, null);
+    }
+
+    @Override
+    public void to(String topic) {
+        String name = topology.newName(KStreamImpl.SINK_NAME);
+
+        topology.addSink(name, topic, this.name);
+    }
+
+    @Override
+    public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
+        String name = topology.newName(KStreamImpl.SINK_NAME);
+
+        topology.addSink(name, topic, keySerializer, valSerializer, this.name);
+    }
+
+    @Override
+    public KStream<K, V> toStream() {
+        String name = topology.newName(TOSTREAM_NAME);
+
+        topology.addProcessor(name, new KStreamPassThrough(), this.name);
+
+        return new KStreamImpl<>(topology, name, Collections.singleton(sourceNode));
+    }
+
+    KTableValueGetterSupplier<K, V> valueGetterSupplier() {
+        if (parent != null) {
+            return processorSupplier.view(parent.valueGetterSupplier());
+        } else {
+            KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
+            synchronized (source) {
+                if (!source.isMaterialized()) {
+                    StateStoreSupplier storeSupplier =
+                            new KTableStoreSupplier(topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
+                    topology.addStateStore(storeSupplier, name);
+                    source.materialize();
+                }
+            }
+            return new KTableSourceValueGetterSupplier<>(topic);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
new file mode 100644
index 0000000..0d14390
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableMapValues<K1, V1, V2> extends KTableProcessorSupplier<K1, V1, V2> {
+
+    private final ValueMapper<V1, V2> mapper;
+
+    public KTableMapValues(ValueMapper<V1, V2> mapper) {
+        this.mapper = mapper;
+    }
+
+    @Override
+    public Processor<K1, V1> get() {
+        return new KTableMapProcessor();
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K1, V2> view(KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier) {
+        return new KTableDerivedValueGetterSupplier<K1, V1, V2>(parentValueGetterSupplier) {
+
+            public KTableValueGetter<K1, V2> get() {
+                return new KTableMapValuesValueGetter(parentValueGetterSupplier.get());
+            }
+
+        };
+    }
+
+    private V2 computeNewValue(V1 value) {
+        V2 newValue = null;
+
+        if (value != null)
+            newValue = mapper.apply(value);
+
+        return newValue;
+    }
+
+    private class KTableMapProcessor extends AbstractProcessor<K1, V1> {
+
+        @Override
+        public void process(K1 key, V1 value) {
+            context().forward(key, computeNewValue(value));
+        }
+
+    }
+
+    private class KTableMapValuesValueGetter implements KTableValueGetter<K1, V2> {
+
+        private final KTableValueGetter<K1, V1> parentGetter;
+
+        public KTableMapValuesValueGetter(KTableValueGetter<K1, V1> parentGetter) {
+            this.parentGetter = parentGetter;
+        }
+
+        public void init(ProcessorContext context) {
+            parentGetter.init(context);
+        }
+
+        public V2 get(K1 key) {
+            return computeNewValue(parentGetter.get(key));
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
new file mode 100644
index 0000000..cc6467f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public abstract class KTableProcessorSupplier<K, V, T> implements ProcessorSupplier<K, V> {
+
+    public abstract KTableValueGetterSupplier<K, T> view(KTableValueGetterSupplier<K, V> parentValueGetterFactory);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
new file mode 100644
index 0000000..93790ed
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableSource<K, V> extends KTableProcessorSupplier<K, V, V> {
+
+    private final String topic;
+
+    private boolean materialized = false;
+
+    public KTableSource(String topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return materialized ? new MaterializedKTableSourceProcessor() : new KTableSourceProcessor();
+    }
+
+    public void materialize() {
+        materialized = true;
+    }
+
+    public boolean isMaterialized() {
+        return materialized;
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V> view(KTableValueGetterSupplier<K, V> parentValueGetterSupplier) {
+        throw new IllegalStateException("a view cannot be define on the ktable source");
+    }
+
+    private class KTableSourceProcessor extends AbstractProcessor<K, V> {
+        @Override
+        public void process(K key, V value) {
+            context().forward(key, value);
+        }
+    }
+
+    private class MaterializedKTableSourceProcessor extends AbstractProcessor<K, V> {
+
+        private KeyValueStore<K, V> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+            store = (KeyValueStore<K, V>) context.getStateStore(topic);
+        }
+
+        @Override
+        public void process(K key, V value) {
+            store.put(key, value);
+            context().forward(key, value);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
new file mode 100644
index 0000000..dab92d5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
+
+    private final String topic;
+
+    public KTableSourceValueGetterSupplier(String topic) {
+        this.topic = topic;
+    }
+
+    public KTableValueGetter<K, V> get() {
+        return new KTableSourceValueGetter();
+    }
+
+    private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
+
+        KeyValueStore<K, V> store = null;
+
+        @SuppressWarnings("unchecked")
+        public void init(ProcessorContext context) {
+            store = (KeyValueStore<K, V>) context.getStateStore(topic);
+        }
+
+        public V get(K key) {
+            return store.get(key);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
new file mode 100644
index 0000000..d07fc5d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.MeteredKeyValueStore;
+import org.apache.kafka.streams.state.RocksDBStore;
+import org.apache.kafka.streams.state.Serdes;
+
+/**
+ * A KTable storage. It stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ */
+public class KTableStoreSupplier<K, V> implements StateStoreSupplier {
+
+    private final String name;
+    private final Serdes<K, V> serdes;
+    private final Time time;
+
+    protected KTableStoreSupplier(String name,
+                                  Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
+                                  Serializer<V> valSerializer, Deserializer<V> valDeserializer,
+                                  Time time) {
+        this.name = name;
+        this.serdes = new Serdes<>(name, keySerializer, keyDeserializer, valSerializer, valDeserializer);
+        this.time = time;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public StateStore get() {
+        return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes), serdes, "rocksdb-state", time).disableLogging();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
new file mode 100644
index 0000000..53ec6ba
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+public interface KTableValueGetter<K, V> {
+
+    void init(ProcessorContext context);
+
+    V get(K key);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
new file mode 100644
index 0000000..1ab6ba6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+public interface KTableValueGetterSupplier<K, V> {
+
+    KTableValueGetter<K, V> get();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index 00b56b3..187c4ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -50,7 +50,7 @@ public abstract class PartitionGrouper {
         return partitionAssignor.taskIds(partition);
     }
 
-    public Set<TaskId> standbyTasks() {
+    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
         return partitionAssignor.standbyTasks();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index 5344f6c..023bbbb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.streams.processor;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class TaskId implements Comparable<TaskId> {
@@ -47,6 +50,15 @@ public class TaskId implements Comparable<TaskId> {
         }
     }
 
+    public void writeTo(DataOutputStream out) throws IOException {
+        out.writeInt(topicGroupId);
+        out.writeInt(partition);
+    }
+
+    public static TaskId readFrom(DataInputStream in) throws IOException {
+        return new TaskId(in.readInt(), in.readInt());
+    }
+
     public void writeTo(ByteBuffer buf) {
         buf.putInt(topicGroupId);
         buf.putInt(partition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 14037ab..e1b4d62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -28,31 +28,37 @@ import org.apache.kafka.streams.processor.TaskId;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 public abstract class AbstractTask {
     protected final TaskId id;
     protected final ProcessorTopology topology;
+    protected final Consumer consumer;
     protected final ProcessorStateManager stateMgr;
     protected final Set<TopicPartition> partitions;
     protected ProcessorContext processorContext;
 
     protected AbstractTask(TaskId id,
-                           Consumer<byte[], byte[]> restoreConsumer,
+                           Collection<TopicPartition> partitions,
                            ProcessorTopology topology,
+                           Consumer<byte[], byte[]> consumer,
+                           Consumer<byte[], byte[]> restoreConsumer,
                            StreamingConfig config,
-                           Set<TopicPartition> partitions) {
+                           boolean isStandby) {
         this.id = id;
+        this.partitions = new HashSet<>(partitions);
         this.topology = topology;
-        this.partitions = partitions;
+        this.consumer = consumer;
 
         // create the processor state manager
         try {
             File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
             // if partitions is null, this is a standby task
-            this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, partitions == null);
+            this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, isStandby);
         } catch (IOException e) {
             throw new KafkaException("Error while creating the state manager", e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
index 451b214..54d5567 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
@@ -48,7 +48,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
     private StreamThread streamThread;
     private int numStandbyReplicas;
     private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
-    private Set<TaskId> standbyTasks;
+    private Map<TaskId, Set<TopicPartition>> standbyTasks;
 
     @Override
     public void configure(Map<String, ?> configs) {
@@ -154,28 +154,32 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
 
             final int numConsumers = consumers.size();
             List<TaskId> active = new ArrayList<>();
-            Set<TaskId> standby = new HashSet<>();
+            Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
 
             int i = 0;
             for (String consumer : consumers) {
-                List<TopicPartition> partitions = new ArrayList<>();
+                List<TopicPartition> activePartitions = new ArrayList<>();
 
                 final int numTaskIds = taskIds.size();
                 for (int j = i; j < numTaskIds; j += numConsumers) {
                     TaskId taskId = taskIds.get(j);
                     if (j < numActiveTasks) {
                         for (TopicPartition partition : partitionGroups.get(taskId)) {
-                            partitions.add(partition);
+                            activePartitions.add(partition);
                             active.add(taskId);
                         }
                     } else {
-                        // no partition to a standby task
-                        standby.add(taskId);
+                        Set<TopicPartition> standbyPartitions = standby.get(taskId);
+                        if (standbyPartitions == null) {
+                            standbyPartitions = new HashSet<>();
+                            standby.put(taskId, standbyPartitions);
+                        }
+                        standbyPartitions.addAll(partitionGroups.get(taskId));
                     }
                 }
 
                 AssignmentInfo data = new AssignmentInfo(active, standby);
-                assignment.put(consumer, new Assignment(partitions, data.encode()));
+                assignment.put(consumer, new Assignment(activePartitions, data.encode()));
                 i++;
 
                 active.clear();
@@ -220,7 +224,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
         return partitionToTaskIds.get(partition);
     }
 
-    public Set<TaskId> standbyTasks() {
+    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
         return standbyTasks;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 2a8df9e..4cff02d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -33,6 +33,7 @@ import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -52,6 +53,7 @@ public class ProcessorStateManager {
     private final Consumer<byte[], byte[]> restoreConsumer;
     private final Map<TopicPartition, Long> restoredOffsets;
     private final Map<TopicPartition, Long> checkpointedOffsets;
+    private final Map<TopicPartition, Long> offsetLimits;
     private final boolean isStandby;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks
 
@@ -63,6 +65,7 @@ public class ProcessorStateManager {
         this.restoredOffsets = new HashMap<>();
         this.isStandby = isStandby;
         this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
+        this.offsetLimits = new HashMap<>();
 
         // create the state directory for this task if missing (we won't create the parent directory)
         createStateDirectory(baseDir);
@@ -165,8 +168,10 @@ public class ProcessorStateManager {
 
             // restore its state from changelog records; while restoring the log end offset
             // should not change since it is only written by this thread.
+            long limit = offsetLimit(storePartition);
             while (true) {
                 for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
+                    if (record.offset() >= limit) break;
                     stateRestoreCallback.restore(record.key(), record.value());
                 }
 
@@ -178,7 +183,7 @@ public class ProcessorStateManager {
             }
 
             // record the restored offset for its change log partition
-            long newOffset = restoreConsumer.position(storePartition);
+            long newOffset = Math.min(limit, restoreConsumer.position(storePartition));
             restoredOffsets.put(storePartition, newOffset);
         } finally {
             // un-assign the change log partition
@@ -202,16 +207,40 @@ public class ProcessorStateManager {
         return partitionsAndOffsets;
     }
 
-    public void updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
+    public List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
+        long limit = offsetLimit(storePartition);
+        List<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
+
         // restore states from changelog records
         StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());
 
+        long lastOffset = -1L;
+        int count = 0;
         for (ConsumerRecord<byte[], byte[]> record : records) {
-            restoreCallback.restore(record.key(), record.value());
+            if (record.offset() < limit) {
+                restoreCallback.restore(record.key(), record.value());
+                lastOffset = record.offset();
+            } else {
+                if (remainingRecords == null)
+                    remainingRecords = new ArrayList<>(records.size() - count);
+
+                remainingRecords.add(record);
+            }
+            count++;
         }
         // record the restored offset for its change log partition
-        long newOffset = restoreConsumer.position(storePartition);
-        restoredOffsets.put(storePartition, newOffset);
+        restoredOffsets.put(storePartition, lastOffset + 1);
+
+        return remainingRecords;
+    }
+
+    public void putOffsetLimit(TopicPartition partition, long limit) {
+        offsetLimits.put(partition, limit);
+    }
+
+    private long offsetLimit(TopicPartition partition) {
+        Long limit = offsetLimits.get(partition);
+        return limit != null ? limit : Long.MAX_VALUE;
     }
 
     public StateStore getStore(String name) {
@@ -253,14 +282,14 @@ public class ProcessorStateManager {
                 if (stores.get(storeName).persistent()) {
                     Long offset = ackedOffsets.get(part);
 
-                    if (offset == null) {
-                        // if no record was produced. we need to check the restored offset.
-                        offset = restoredOffsets.get(part);
-                    }
-
                     if (offset != null) {
                         // store the last offset + 1 (the log position after restoration)
                         checkpointOffsets.put(part, offset + 1);
+                    } else {
+                        // if no record was produced. we need to check the restored offset.
+                        offset = restoredOffsets.get(part);
+                        if (offset != null)
+                            checkpointOffsets.put(part, offset);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index c6442d9..d0d8493 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.StreamingMetrics;
@@ -50,11 +51,13 @@ public class StandbyTask extends AbstractTask {
      * @param metrics               the {@link StreamingMetrics} created by the thread
      */
     public StandbyTask(TaskId id,
-                       Consumer<byte[], byte[]> restoreConsumer,
+                       Collection<TopicPartition> partitions,
                        ProcessorTopology topology,
+                       Consumer<byte[], byte[]> consumer,
+                       Consumer<byte[], byte[]> restoreConsumer,
                        StreamingConfig config,
                        StreamingMetrics metrics) {
-        super(id, restoreConsumer, topology, config, null);
+        super(id, partitions, topology, consumer, restoreConsumer, config, true);
 
         // initialize the topology with its own context
         this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
@@ -64,6 +67,9 @@ public class StandbyTask extends AbstractTask {
         ((StandbyContextImpl) this.processorContext).initialized();
 
         this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets());
+
+        // set initial offset limits
+        initializeOffsetLimits();
     }
 
     public Map<TopicPartition, Long> checkpointedOffsets() {
@@ -76,13 +82,24 @@ public class StandbyTask extends AbstractTask {
 
     /**
      * Updates a state store using records from one change log partition
+     * @return a list of records not consumed
      */
-    public void update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
-        stateMgr.updateStandbyStates(partition, records);
+    public List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
+        return stateMgr.updateStandbyStates(partition, records);
     }
 
     public void commit() {
         stateMgr.flush();
+
+        // reinitialize offset limits
+        initializeOffsetLimits();
+    }
+
+    protected void initializeOffsetLimits() {
+        for (TopicPartition partition : partitions) {
+            OffsetAndMetadata metadata = consumer.committed(partition); // TODO: batch API?
+            stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() : 0L);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 16f0667..24c450e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -30,9 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -45,7 +43,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     private final int maxBufferedSize;
 
-    private final Consumer consumer;
     private final PartitionGroup partitionGroup;
     private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
     private final PunctuationQueue punctuationQueue;
@@ -73,15 +70,14 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * @param metrics               the {@link StreamingMetrics} created by the thread
      */
     public StreamTask(TaskId id,
+                      Collection<TopicPartition> partitions,
+                      ProcessorTopology topology,
                       Consumer<byte[], byte[]> consumer,
                       Producer<byte[], byte[]> producer,
                       Consumer<byte[], byte[]> restoreConsumer,
-                      Collection<TopicPartition> partitions,
-                      ProcessorTopology topology,
                       StreamingConfig config,
                       StreamingMetrics metrics) {
-        super(id, restoreConsumer, topology, config, Collections.unmodifiableSet(new HashSet<>(partitions)));
-        this.consumer = consumer;
+        super(id, partitions, topology, consumer, restoreConsumer, config, false);
         this.punctuationQueue = new PunctuationQueue();
         this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 
@@ -98,7 +94,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
         TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
         this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
 
-        // initialize the consumed and produced offset cache
+        // initialize the consumed offset cache
         this.consumedOffsets = new HashMap<>();
 
         // create the record recordCollector that maintains the produced offsets
@@ -245,7 +241,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
         if (commitOffsetNeeded) {
             Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
             for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
-                consumedOffsetsAndMetadata.put(entry.getKey(), new OffsetAndMetadata(entry.getValue() + 1L));
+                TopicPartition partition = entry.getKey();
+                long offset = entry.getValue() + 1;
+                consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset));
+                stateMgr.putOffsetLimit(partition, offset);
             }
             consumer.commitSync(consumedOffsetsAndMetadata);
             commitOffsetNeeded = false;
@@ -280,6 +279,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
         punctuationQueue.schedule(new PunctuationSchedule(currNode, interval));
     }
 
+    @Override
     public void close() {
         this.partitionGroup.close();
         this.consumedOffsets.clear();

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 31dca39..c77a027 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -73,6 +74,7 @@ public class StreamThread extends Thread {
 
     protected final StreamingConfig config;
     protected final TopologyBuilder builder;
+    protected final Set<String> sourceTopics;
     protected final Producer<byte[], byte[]> producer;
     protected final Consumer<byte[], byte[]> consumer;
     protected final Consumer<byte[], byte[]> restoreConsumer;
@@ -94,6 +96,9 @@ public class StreamThread extends Thread {
     private long lastCommit;
     private long recordsProcessed;
 
+    private final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
+    private boolean processStandbyRecords = false;
+
     final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
@@ -133,6 +138,7 @@ public class StreamThread extends Thread {
 
         this.config = config;
         this.builder = builder;
+        this.sourceTopics = builder.sourceTopics();
         this.clientId = clientId;
         this.clientUUID = clientUUID;
         this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
@@ -148,6 +154,9 @@ public class StreamThread extends Thread {
         this.standbyTasks = new HashMap<>();
         this.prevTasks = new HashSet<>();
 
+        // standby ktables
+        this.standbyRecords = new HashMap<>();
+
         // read in task specific config values
         this.stateDir = new File(this.config.getString(StreamingConfig.STATE_DIR_CONFIG));
         this.stateDir.mkdir();
@@ -256,7 +265,7 @@ public class StreamThread extends Thread {
 
             ensureCopartitioning(builder.copartitionGroups());
 
-            consumer.subscribe(new ArrayList<>(builder.sourceTopics()), rebalanceListener);
+            consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
 
             while (stillRunning()) {
                 // try to fetch some records if necessary
@@ -293,15 +302,12 @@ public class StreamThread extends Thread {
                     }
 
                     maybePunctuate();
-                    maybeCommit();
                 } else {
                     // even when no task is assigned, we must poll to get a task.
                     requiresPoll = true;
                 }
-
-                if (!standbyTasks.isEmpty()) {
-                    updateStandbyTasks();
-                }
+                maybeCommit();
+                maybeUpdateStandbyTasks();
 
                 maybeClean();
             }
@@ -310,13 +316,38 @@ public class StreamThread extends Thread {
         }
     }
 
-    private void updateStandbyTasks() {
-        ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
+    private void maybeUpdateStandbyTasks() {
+        if (!standbyTasks.isEmpty()) {
+            if (processStandbyRecords) {
+                if (!standbyRecords.isEmpty()) {
+                    for (StandbyTask task : standbyTasks.values()) {
+                        for (TopicPartition partition : task.changeLogPartitions()) {
+                            List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.remove(partition);
+                            if (remaining != null) {
+                                remaining = task.update(partition, remaining);
+                                if (remaining != null) {
+                                    standbyRecords.put(partition, remaining);
+                                } else {
+                                    restoreConsumer.resume(partition);
+                                }
+                            }
+                        }
+                    }
+                }
+                processStandbyRecords = false;
+            }
 
-        if (!records.isEmpty()) {
-            for (StandbyTask task : standbyTasks.values()) {
-                for (TopicPartition partition : task.changeLogPartitions()) {
-                    task.update(partition, records.records(partition));
+            ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
+
+            if (!records.isEmpty()) {
+                for (StandbyTask task : standbyTasks.values()) {
+                    for (TopicPartition partition : task.changeLogPartitions()) {
+                        List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition));
+                        if (remaining != null) {
+                            restoreConsumer.pause(partition);
+                            standbyRecords.put(partition, remaining);
+                        }
+                    }
                 }
             }
         }
@@ -359,6 +390,8 @@ public class StreamThread extends Thread {
 
             commitAll();
             lastCommit = now;
+
+            processStandbyRecords = true;
         } else {
             for (StreamTask task : activeTasks.values()) {
                 try {
@@ -478,12 +511,12 @@ public class StreamThread extends Thread {
         return tasks;
     }
 
-    protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
+    protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
         sensors.taskCreationSensor.record();
 
         ProcessorTopology topology = builder.build(id.topicGroupId);
 
-        return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, topology, config, sensors);
+        return new StreamTask(id, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
     }
 
     private void addStreamTasks(Collection<TopicPartition> assignment) {
@@ -501,7 +534,7 @@ public class StreamThread extends Thread {
             }
         }
 
-        // create the tasks
+        // create the active tasks
         for (TaskId taskId : partitionsForTask.keySet()) {
             try {
                 activeTasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId)));
@@ -510,8 +543,6 @@ public class StreamThread extends Thread {
                 throw e;
             }
         }
-
-        lastClean = time.milliseconds();
     }
 
     private void removeStreamTasks() {
@@ -537,13 +568,13 @@ public class StreamThread extends Thread {
         sensors.taskDestructionSensor.record();
     }
 
-    protected StandbyTask createStandbyTask(TaskId id) {
+    protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
         sensors.taskCreationSensor.record();
 
         ProcessorTopology topology = builder.build(id.topicGroupId);
 
         if (!topology.stateStoreSuppliers().isEmpty()) {
-            return new StandbyTask(id, restoreConsumer, topology, config, sensors);
+            return new StandbyTask(id, partitions, topology, consumer, restoreConsumer, config, sensors);
         } else {
             return null;
         }
@@ -552,10 +583,15 @@ public class StreamThread extends Thread {
     private void addStandbyTasks() {
         Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
 
-        for (TaskId taskId : partitionGrouper.standbyTasks()) {
-            StandbyTask task = createStandbyTask(taskId);
+        // create the standby tasks
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionGrouper.standbyTasks().entrySet()) {
+            TaskId taskId = entry.getKey();
+            Set<TopicPartition> partitions = entry.getValue();
+            StandbyTask task = createStandbyTask(taskId, partitions);
             if (task != null) {
                 standbyTasks.put(taskId, task);
+                // collect checked pointed offsets to position the restore consumer
+                // this include all partitions from which we restore states
                 checkpointedOffsets.putAll(task.checkpointedOffsets());
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index d82dd7d..2bd4457 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -17,14 +17,23 @@
 
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.ByteBufferInputStream;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class AssignmentInfo {
@@ -33,70 +42,98 @@ public class AssignmentInfo {
 
     public final int version;
     public final List<TaskId> activeTasks; // each element corresponds to a partition
-    public final Set<TaskId> standbyTasks;
+    public final Map<TaskId, Set<TopicPartition>> standbyTasks;
 
-    public AssignmentInfo(List<TaskId> activeTasks, Set<TaskId> standbyTasks) {
+    public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
         this(1, activeTasks, standbyTasks);
     }
 
-    protected AssignmentInfo(int version, List<TaskId> activeTasks, Set<TaskId> standbyTasks) {
+    protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
         this.version = version;
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
     }
 
     public ByteBuffer encode() {
-        if (version == 1) {
-            ByteBuffer buf = ByteBuffer.allocate(4 + 4 + activeTasks.size() * 8 + 4 + standbyTasks.size() * 8);
-            // Encode version
-            buf.putInt(1);
-            // Encode active tasks
-            buf.putInt(activeTasks.size());
-            for (TaskId id : activeTasks) {
-                id.writeTo(buf);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream out = new DataOutputStream(baos);
+
+        try {
+            if (version == 1) {
+                // Encode version
+                out.writeInt(1);
+                // Encode active tasks
+                out.writeInt(activeTasks.size());
+                for (TaskId id : activeTasks) {
+                    id.writeTo(out);
+                }
+                // Encode standby tasks
+                out.writeInt(standbyTasks.size());
+                for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
+                    TaskId id = entry.getKey();
+                    id.writeTo(out);
+
+                    Set<TopicPartition> partitions = entry.getValue();
+                    out.writeInt(partitions.size());
+                    for (TopicPartition partition : partitions) {
+                        out.writeUTF(partition.topic());
+                        out.writeInt(partition.partition());
+                    }
+                }
+
+                out.flush();
+                out.close();
+
+                return ByteBuffer.wrap(baos.toByteArray());
+
+            } else {
+                TaskAssignmentException ex = new TaskAssignmentException("unable to encode assignment data: version=" + version);
+                log.error(ex.getMessage(), ex);
+                throw ex;
             }
-            // Encode standby tasks
-            buf.putInt(standbyTasks.size());
-            for (TaskId id : standbyTasks) {
-                id.writeTo(buf);
-            }
-            buf.rewind();
-
-            return buf;
-
-        } else {
-            TaskAssignmentException ex = new TaskAssignmentException("unable to encode assignment data: version=" + version);
-            log.error(ex.getMessage(), ex);
-            throw ex;
+        } catch (IOException ex) {
+            throw new KafkaException("failed to encode AssignmentInfo", ex);
         }
     }
 
     public static AssignmentInfo decode(ByteBuffer data) {
         // ensure we are at the beginning of the ByteBuffer
         data.rewind();
-
-        // Decode version
-        int version = data.getInt();
-        if (version == 1) {
-           // Decode active tasks
-            int count = data.getInt();
-            List<TaskId> activeTasks = new ArrayList<>(count);
-            for (int i = 0; i < count; i++) {
-                activeTasks.add(TaskId.readFrom(data));
+        DataInputStream in = new DataInputStream(new ByteBufferInputStream(data));
+
+        try {
+            // Decode version
+            int version = in.readInt();
+            if (version == 1) {
+                // Decode active tasks
+                int count = in.readInt();
+                List<TaskId> activeTasks = new ArrayList<>(count);
+                for (int i = 0; i < count; i++) {
+                    activeTasks.add(TaskId.readFrom(in));
+                }
+                // Decode standby tasks
+                count = in.readInt();
+                Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
+                for (int i = 0; i < count; i++) {
+                    TaskId id = TaskId.readFrom(in);
+
+                    int numPartitions = in.readInt();
+                    Set<TopicPartition> partitions = new HashSet<>(numPartitions);
+                    for (int j = 0; j < numPartitions; j++) {
+                        partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
+                    }
+                    standbyTasks.put(id, partitions);
+                }
+
+                return new AssignmentInfo(activeTasks, standbyTasks);
+
+            } else {
+                TaskAssignmentException ex = new TaskAssignmentException("unknown assignment data version: " + version);
+                log.error(ex.getMessage(), ex);
+                throw ex;
             }
-            // Decode standby tasks
-            count = data.getInt();
-            Set<TaskId> standbyTasks = new HashSet<>(count);
-            for (int i = 0; i < count; i++) {
-                standbyTasks.add(TaskId.readFrom(data));
-            }
-
-            return new AssignmentInfo(activeTasks, standbyTasks);
-
-        } else {
-            TaskAssignmentException ex = new TaskAssignmentException("unknown assignment data version: " + version);
-            log.error(ex.getMessage(), ex);
-            throw ex;
+        } catch (IOException ex) {
+            throw new KafkaException("failed to decode AssignmentInfo", ex);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
new file mode 100644
index 0000000..2ad1f47
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class KeyValueStoreChangeLogger<K, V> {
+
+    protected final Serdes<K, V> serialization;
+
+    private final Set<K> dirty;
+    private final Set<K> removed;
+    private final int maxDirty;
+    private final int maxRemoved;
+
+    private final String topic;
+    private int partition;
+    private ProcessorContext context;
+
+    // always wrap the logged store with the metered store
+    public KeyValueStoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
+        this.topic = topic;
+        this.serialization = serialization;
+        this.context = context;
+        this.partition = context.id().partition;
+
+        this.dirty = new HashSet<>();
+        this.removed = new HashSet<>();
+        this.maxDirty = 100; // TODO: this needs to be configurable
+        this.maxRemoved = 100; // TODO: this needs to be configurable
+    }
+
+    public void add(K key) {
+        this.dirty.add(key);
+        this.removed.remove(key);
+    }
+
+    public void delete(K key) {
+        this.dirty.remove(key);
+        this.removed.add(key);
+    }
+
+    public void maybeLogChange(KeyValueStore<K, V> kv) {
+        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
+            logChange(kv);
+    }
+
+    public void logChange(KeyValueStore<K, V> kv) {
+        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
+        if (collector != null) {
+            Serializer<K> keySerializer = serialization.keySerializer();
+            Serializer<V> valueSerializer = serialization.valueSerializer();
+
+            for (K k : this.removed) {
+                collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
+            }
+            for (K k : this.dirty) {
+                V v = kv.get(k);
+                collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
+            }
+            this.removed.clear();
+            this.dirty.clear();
+        }
+    }
+
+}