You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/04 23:59:30 UTC

[1/2] kafka git commit: KAFKA-2856: Add KTable non-stateful APIs along with standby task support

Repository: kafka
Updated Branches:
  refs/heads/trunk cd54fc881 -> 39c3512ec


http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index b68f763..8aed6b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -21,16 +21,11 @@ import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
 
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
@@ -39,8 +34,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     protected final String metricGrp;
     protected final Time time;
 
-    private final String topic;
-
     private Sensor putTime;
     private Sensor getTime;
     private Sensor deleteTime;
@@ -51,26 +44,20 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private Sensor restoreTime;
     private StreamingMetrics metrics;
 
-    private final Set<K> dirty;
-    private final Set<K> removed;
-    private final int maxDirty;
-    private final int maxRemoved;
-
-    private int partition;
-    private ProcessorContext context;
+    private boolean loggingEnabled = true;
+    private KeyValueStoreChangeLogger<K, V> changeLogger = null;
 
-    // always wrap the logged store with the metered store
+    // always wrap the store with the metered store
     public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricGrp, Time time) {
         this.inner = inner;
         this.serialization = serialization;
         this.metricGrp = metricGrp;
         this.time = time != null ? time : new SystemTime();
-        this.topic = inner.name();
+    }
 
-        this.dirty = new HashSet<K>();
-        this.removed = new HashSet<K>();
-        this.maxDirty = 100; // TODO: this needs to be configurable
-        this.maxRemoved = 100; // TODO: this needs to be configurable
+    public MeteredKeyValueStore<K, V> disableLogging() {
+        loggingEnabled = false;
+        return this;
     }
 
     @Override
@@ -80,7 +67,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public void init(ProcessorContext context) {
-        String name = name();
+        final String name = name();
         this.metrics = context.metrics();
         this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
         this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
@@ -92,8 +79,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
 
         serialization.init(context);
-        this.context = context;
-        this.partition = context.id().partition;
+        this.changeLogger = this.loggingEnabled ? new KeyValueStoreChangeLogger<>(name, context, serialization) : null;
 
         // register and possibly restore the state from the logs
         long startNs = time.nanoseconds();
@@ -105,8 +91,8 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
             context.register(this, new StateRestoreCallback() {
                 @Override
                 public void restore(byte[] key, byte[] value) {
-                    inner.put(keyDeserializer.deserialize(topic, key),
-                            valDeserializer.deserialize(topic, value));
+                    inner.put(keyDeserializer.deserialize(name, key),
+                            valDeserializer.deserialize(name, value));
                 }
             });
         } finally {
@@ -135,9 +121,10 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         try {
             this.inner.put(key, value);
 
-            this.dirty.add(key);
-            this.removed.remove(key);
-            maybeLogChange();
+            if (loggingEnabled) {
+                changeLogger.add(key);
+                changeLogger.maybeLogChange(this.inner);
+            }
         } finally {
             this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
         }
@@ -149,13 +136,13 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         try {
             this.inner.putAll(entries);
 
-            for (Entry<K, V> entry : entries) {
-                K key = entry.key();
-                this.dirty.add(key);
-                this.removed.remove(key);
+            if (loggingEnabled) {
+                for (Entry<K, V> entry : entries) {
+                    K key = entry.key();
+                    changeLogger.add(key);
+                }
+                changeLogger.maybeLogChange(this.inner);
             }
-
-            maybeLogChange();
         } finally {
             this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
         }
@@ -167,9 +154,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         try {
             V value = this.inner.delete(key);
 
-            this.dirty.remove(key);
-            this.removed.add(key);
-            maybeLogChange();
+            removed(key);
 
             return value;
         } finally {
@@ -179,14 +164,15 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     /**
      * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
-     * store other than {@link #delete(Object)}.
+     * store.
      *
      * @param key the key for the entry that the inner store removed
      */
     protected void removed(K key) {
-        this.dirty.remove(key);
-        this.removed.add(key);
-        maybeLogChange();
+        if (loggingEnabled) {
+            changeLogger.delete(key);
+            changeLogger.maybeLogChange(this.inner);
+        }
     }
 
     @Override
@@ -209,35 +195,14 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         long startNs = time.nanoseconds();
         try {
             this.inner.flush();
-            logChange();
+
+            if (loggingEnabled)
+                changeLogger.logChange(this.inner);
         } finally {
             this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
         }
     }
 
-    private void maybeLogChange() {
-        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
-            logChange();
-    }
-
-    private void logChange() {
-        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
-        if (collector != null) {
-            Serializer<K> keySerializer = serialization.keySerializer();
-            Serializer<V> valueSerializer = serialization.valueSerializer();
-
-            for (K k : this.removed) {
-                collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
-            }
-            for (K k : this.dirty) {
-                V v = this.inner.get(k);
-                collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
-            }
-            this.removed.clear();
-            this.dirty.clear();
-        }
-    }
-
     private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
 
         private final KeyValueIterator<K1, V1> iter;

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
index f1fbd9f..41314b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
@@ -17,25 +17,9 @@
 
 package org.apache.kafka.streams.state;
 
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.util.Comparator;
-import java.util.List;
-import java.util.NoSuchElementException;
 
 /**
  * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
@@ -43,7 +27,7 @@ import java.util.NoSuchElementException;
  * @param <K> the type of keys
  * @param <V> the type of values
  *
- * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
+ * @see Stores#create(String)
  */
 public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
 
@@ -62,239 +46,7 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredKeyValueStore<K, V>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
+        return new MeteredKeyValueStore<>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
     }
 
-    private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
-
-        private static final int TTL_NOT_USED = -1;
-
-        // TODO: these values should be configurable
-        private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
-        private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
-        private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
-        private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
-        private static final long BLOCK_SIZE = 4096L;
-        private static final int TTL_SECONDS = TTL_NOT_USED;
-        private static final int MAX_WRITE_BUFFERS = 3;
-        private static final String DB_FILE_DIR = "rocksdb";
-
-        private final Serdes<K, V> serdes;
-        private final String topic;
-
-        private final Options options;
-        private final WriteOptions wOptions;
-        private final FlushOptions fOptions;
-
-        private ProcessorContext context;
-        private int partition;
-        private String dbName;
-        private String dirName;
-        private RocksDB db;
-
-        public RocksDBStore(String name, Serdes<K, V> serdes) {
-            this.topic = name;
-            this.serdes = serdes;
-
-            // initialize the rocksdb options
-            BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
-            tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
-            tableConfig.setBlockSize(BLOCK_SIZE);
-
-            options = new Options();
-            options.setTableFormatConfig(tableConfig);
-            options.setWriteBufferSize(WRITE_BUFFER_SIZE);
-            options.setCompressionType(COMPRESSION_TYPE);
-            options.setCompactionStyle(COMPACTION_STYLE);
-            options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
-            options.setCreateIfMissing(true);
-            options.setErrorIfExists(false);
-
-            wOptions = new WriteOptions();
-            wOptions.setDisableWAL(true);
-
-            fOptions = new FlushOptions();
-            fOptions.setWaitForFlush(true);
-        }
-
-        public void init(ProcessorContext context) {
-            serdes.init(context);
-
-            this.context = context;
-            this.partition = context.id().partition;
-            this.dbName = this.topic + "." + this.partition;
-            this.dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
-            this.db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
-        }
-
-        private RocksDB openDB(File dir, Options options, int ttl) {
-            try {
-                if (ttl == TTL_NOT_USED) {
-                    dir.getParentFile().mkdirs();
-                    return RocksDB.open(options, dir.toString());
-                } else {
-                    throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
-                    // TODO: support TTL with change log?
-                    // return TtlDB.open(options, dir.toString(), ttl, false);
-                }
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
-            }
-        }
-
-        @Override
-        public String name() {
-            return this.topic;
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            try {
-                return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void put(K key, V value) {
-            try {
-                if (value == null) {
-                    db.remove(wOptions, serdes.rawKey(key));
-                } else {
-                    db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
-                }
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
-        }
-
-        @Override
-        public V delete(K key) {
-            V value = get(key);
-            put(key, null);
-            return value;
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            RocksIterator innerIter = db.newIterator();
-            innerIter.seekToFirst();
-            return new RocksDbIterator<K, V>(innerIter, serdes);
-        }
-
-        @Override
-        public void flush() {
-            try {
-                db.flush(fOptions);
-            } catch (RocksDBException e) {
-                // TODO: this needs to be handled more accurately
-                throw new KafkaException("Error while executing flush from store " + this.topic, e);
-            }
-        }
-
-        @Override
-        public void close() {
-            flush();
-            db.close();
-        }
-
-        private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
-            private final RocksIterator iter;
-            private final Serdes<K, V> serdes;
-
-            public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
-                this.iter = iter;
-                this.serdes = serdes;
-            }
-
-            protected byte[] peekRawKey() {
-                return iter.key();
-            }
-
-            protected Entry<K, V> getEntry() {
-                return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
-            }
-
-            @Override
-            public boolean hasNext() {
-                return iter.isValid();
-            }
-
-            @Override
-            public Entry<K, V> next() {
-                if (!hasNext())
-                    throw new NoSuchElementException();
-
-                Entry<K, V> entry = this.getEntry();
-                iter.next();
-                return entry;
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException("RocksDB iterator does not support remove");
-            }
-
-            @Override
-            public void close() {
-            }
-
-        }
-
-        private static class LexicographicComparator implements Comparator<byte[]> {
-
-            @Override
-            public int compare(byte[] left, byte[] right) {
-                for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
-                    int leftByte = left[i] & 0xff;
-                    int rightByte = right[j] & 0xff;
-                    if (leftByte != rightByte) {
-                        return leftByte - rightByte;
-                    }
-                }
-                return left.length - right.length;
-            }
-        }
-
-        private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
-            // RocksDB's JNI interface does not expose getters/setters that allow the
-            // comparator to be pluggable, and the default is lexicographic, so it's
-            // safe to just force lexicographic comparator here for now.
-            private final Comparator<byte[]> comparator = new LexicographicComparator();
-            byte[] rawToKey;
-
-            public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
-                    K from, K to) {
-                super(iter, serdes);
-                iter.seek(serdes.rawKey(from));
-                this.rawToKey = serdes.rawKey(to);
-            }
-
-            @Override
-            public boolean hasNext() {
-                return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
-            }
-        }
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
new file mode 100644
index 0000000..40ca9f5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
+
+    private static final int TTL_NOT_USED = -1;
+
+    // TODO: these values should be configurable
+    private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
+    private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
+    private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
+    private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
+    private static final long BLOCK_SIZE = 4096L;
+    private static final int TTL_SECONDS = TTL_NOT_USED;
+    private static final int MAX_WRITE_BUFFERS = 3;
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private final String topic;
+
+    private final Options options;
+    private final WriteOptions wOptions;
+    private final FlushOptions fOptions;
+
+    private Serdes<K, V> serdes;
+    private ProcessorContext context;
+    private String dbName;
+    private String dirName;
+    private RocksDB db;
+
+    public RocksDBStore(String name, Serdes<K, V> serdes) {
+        this.topic = name;
+        this.serdes = serdes;
+
+        // initialize the rocksdb options
+        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+        tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
+        tableConfig.setBlockSize(BLOCK_SIZE);
+
+        options = new Options();
+        options.setTableFormatConfig(tableConfig);
+        options.setWriteBufferSize(WRITE_BUFFER_SIZE);
+        options.setCompressionType(COMPRESSION_TYPE);
+        options.setCompactionStyle(COMPACTION_STYLE);
+        options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
+        options.setCreateIfMissing(true);
+        options.setErrorIfExists(false);
+
+        wOptions = new WriteOptions();
+        wOptions.setDisableWAL(true);
+
+        fOptions = new FlushOptions();
+        fOptions.setWaitForFlush(true);
+    }
+
+    public void init(ProcessorContext context) {
+        serdes.init(context);
+
+        this.context = context;
+        this.dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
+        this.db = openDB(new File(this.dirName, this.topic), this.options, TTL_SECONDS);
+    }
+
+    private RocksDB openDB(File dir, Options options, int ttl) {
+        try {
+            if (ttl == TTL_NOT_USED) {
+                dir.getParentFile().mkdirs();
+                return RocksDB.open(options, dir.toString());
+            } else {
+                throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
+                // TODO: support TTL with change log?
+                // return TtlDB.open(options, dir.toString(), ttl, false);
+            }
+        } catch (RocksDBException e) {
+            // TODO: this needs to be handled more accurately
+            throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
+        }
+    }
+
+    @Override
+    public String name() {
+        return this.topic;
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public V get(K key) {
+        try {
+            return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
+        } catch (RocksDBException e) {
+            // TODO: this needs to be handled more accurately
+            throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
+        }
+    }
+
+    @Override
+    public void put(K key, V value) {
+        try {
+            if (value == null) {
+                db.remove(wOptions, serdes.rawKey(key));
+            } else {
+                db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
+            }
+        } catch (RocksDBException e) {
+            // TODO: this needs to be handled more accurately
+            throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
+        }
+    }
+
+    @Override
+    public void putAll(List<Entry<K, V>> entries) {
+        for (Entry<K, V> entry : entries)
+            put(entry.key(), entry.value());
+    }
+
+    @Override
+    public V delete(K key) {
+        V value = get(key);
+        put(key, null);
+        return value;
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(K from, K to) {
+        return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        RocksIterator innerIter = db.newIterator();
+        innerIter.seekToFirst();
+        return new RocksDbIterator<K, V>(innerIter, serdes);
+    }
+
+    @Override
+    public void flush() {
+        try {
+            db.flush(fOptions);
+        } catch (RocksDBException e) {
+            // TODO: this needs to be handled more accurately
+            throw new KafkaException("Error while executing flush from store " + this.topic, e);
+        }
+    }
+
+    @Override
+    public void close() {
+        flush();
+        db.close();
+    }
+
+    private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
+        private final RocksIterator iter;
+        private final Serdes<K, V> serdes;
+
+        public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
+            this.iter = iter;
+            this.serdes = serdes;
+        }
+
+        protected byte[] peekRawKey() {
+            return iter.key();
+        }
+
+        protected Entry<K, V> getEntry() {
+            return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.isValid();
+        }
+
+        @Override
+        public Entry<K, V> next() {
+            if (!hasNext())
+                throw new NoSuchElementException();
+
+            Entry<K, V> entry = this.getEntry();
+            iter.next();
+            return entry;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("RocksDB iterator does not support remove");
+        }
+
+        @Override
+        public void close() {
+        }
+
+    }
+
+    private static class LexicographicComparator implements Comparator<byte[]> {
+
+        @Override
+        public int compare(byte[] left, byte[] right) {
+            for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
+                int leftByte = left[i] & 0xff;
+                int rightByte = right[j] & 0xff;
+                if (leftByte != rightByte) {
+                    return leftByte - rightByte;
+                }
+            }
+            return left.length - right.length;
+        }
+    }
+
+    private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
+        // RocksDB's JNI interface does not expose getters/setters that allow the
+        // comparator to be pluggable, and the default is lexicographic, so it's
+        // safe to just force lexicographic comparator here for now.
+        private final Comparator<byte[]> comparator = new LexicographicComparator();
+        byte[] rawToKey;
+
+        public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
+                                    K from, K to) {
+            super(iter, serdes);
+            iter.seek(serdes.rawKey(from));
+            this.rawToKey = serdes.rawKey(to);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
index f41d928..4e1b05a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-final class Serdes<K, V> {
+public final class Serdes<K, V> {
 
     public static <K, V> Serdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) {
         Serializer<K> keySerializer = serializer(keyClass);
@@ -73,6 +73,7 @@ final class Serdes<K, V> {
      * @param valueSerializer the serializer for values; may be null
      * @param valueDeserializer the deserializer for values; may be null
      */
+    @SuppressWarnings("unchecked")
     public Serdes(String topic,
             Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
             Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
new file mode 100644
index 0000000..590995b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableFilterTest {
+
+    private final Serializer<String> strSerializer = new StringSerializer();
+    private final Deserializer<String> strDeserializer = new StringDeserializer();
+    private final Serializer<Integer> intSerializer = new IntegerSerializer();
+    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
+
+    @Test
+    public void testKTable() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTable<String, Integer> table1 = builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+
+        KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
+            @Override
+            public boolean test(String key, Integer value) {
+                return (value % 2) == 0;
+            }
+        });
+
+        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+        driver.process(topic1, "A", 1);
+        driver.process(topic1, "B", 2);
+        driver.process(topic1, "C", 3);
+        driver.process(topic1, "D", 4);
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
+
+
+        assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4", "A:null", "B:null"), proc2.processed);
+    }
+
+    @Test
+    public void testValueGetter() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, Integer, Integer> table1 =
+                    (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+            KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+                    new Predicate<String, Integer>() {
+                        @Override
+                        public boolean test(String key, Integer value) {
+                            return (value % 2) == 0;
+                        }
+                    });
+
+            KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+            getter2.init(driver.context());
+
+            driver.process(topic1, "A", 1);
+            driver.process(topic1, "B", 1);
+            driver.process(topic1, "C", 1);
+
+            assertNull(getter2.get("A"));
+            assertNull(getter2.get("B"));
+            assertNull(getter2.get("C"));
+
+            driver.process(topic1, "A", 2);
+            driver.process(topic1, "B", 2);
+
+            assertEquals(new Integer(2), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertNull(getter2.get("C"));
+
+            driver.process(topic1, "A", 3);
+
+            assertNull(getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertNull(getter2.get("C"));
+
+            driver.process(topic1, "A", null);
+            driver.process(topic1, "B", null);
+
+            assertNull(getter2.get("A"));
+            assertNull(getter2.get("B"));
+            assertNull(getter2.get("C"));
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
new file mode 100644
index 0000000..56c5703
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableImplTest {
+
+    @Test
+    public void testKTable() {
+        final Serializer<String> serializer = new StringSerializer();
+        final Deserializer<String> deserializer = new StringDeserializer();
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        KTable<String, String> table1 = builder.table(serializer, serializer, deserializer, deserializer, topic1);
+
+        MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
+        table1.toStream().process(proc1);
+
+        KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
+            @Override
+            public Integer apply(String value) {
+                return new Integer(value);
+            }
+        });
+
+        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
+
+        KTable<String, Integer> table3 = table2.filter(new Predicate<String, Integer>() {
+            @Override
+            public boolean test(String key, Integer value) {
+                return (value % 2) == 0;
+            }
+        });
+
+        MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
+        table3.toStream().process(proc3);
+
+        KTable<String, String> table4 = table1.through(topic2, serializer, serializer, deserializer, deserializer);
+
+        MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>();
+        table4.toStream().process(proc4);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "02");
+        driver.process(topic1, "C", "03");
+        driver.process(topic1, "D", "04");
+
+        assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc1.processed);
+        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
+        assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4"), proc3.processed);
+        assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc4.processed);
+    }
+
+    @Test
+    public void testValueGetter() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+            final Deserializer<String> deserializer = new StringDeserializer();
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+            String topic2 = "topic2";
+
+            KTableImpl<String, String, String> table1 =
+                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                    new ValueMapper<String, Integer>() {
+                        @Override
+                        public Integer apply(String value) {
+                            return new Integer(value);
+                        }
+                    });
+            KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+                    new Predicate<String, Integer>() {
+                        @Override
+                        public boolean test(String key, Integer value) {
+                            return (value % 2) == 0;
+                        }
+                    });
+            KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+                    table1.through(topic2, serializer, serializer, deserializer, deserializer);
+
+            KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+            KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+            KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+            KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+            getter1.init(driver.context());
+            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+            getter2.init(driver.context());
+            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+            getter3.init(driver.context());
+            KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+            getter4.init(driver.context());
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            assertEquals("01", getter1.get("A"));
+            assertEquals("01", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(1), getter2.get("A"));
+            assertEquals(new Integer(1), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("01", getter4.get("A"));
+            assertEquals("01", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            assertEquals("02", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(2), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertEquals(new Integer(2), getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("02", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", "03");
+
+            assertEquals("03", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(3), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("03", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", null);
+
+            assertNull(getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertNull(getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertNull(getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
new file mode 100644
index 0000000..1ca6643
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableMapValuesImplTest {
+
+    private final Serializer<String> strSerializer = new StringSerializer();
+    private final Deserializer<String> strDeserializer = new StringDeserializer();
+
+    @Test
+    public void testKTable() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+        KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
+            @Override
+            public Integer apply(String value) {
+                return new Integer(value);
+            }
+        });
+
+        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "02");
+        driver.process(topic1, "C", "03");
+        driver.process(topic1, "D", "04");
+
+        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
+    }
+
+    @Test
+    public void testValueGetter() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+            final Deserializer<String> deserializer = new StringDeserializer();
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+            String topic2 = "topic2";
+
+            KTableImpl<String, String, String> table1 =
+                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                    new ValueMapper<String, Integer>() {
+                        @Override
+                        public Integer apply(String value) {
+                            return new Integer(value);
+                        }
+                    });
+            KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+                    new Predicate<String, Integer>() {
+                        @Override
+                        public boolean test(String key, Integer value) {
+                            return (value % 2) == 0;
+                        }
+                    });
+            KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+                    table1.through(topic2, serializer, serializer, deserializer, deserializer);
+
+            KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+            KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+            KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+            KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+            getter1.init(driver.context());
+            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+            getter2.init(driver.context());
+            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+            getter3.init(driver.context());
+            KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+            getter4.init(driver.context());
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            assertEquals("01", getter1.get("A"));
+            assertEquals("01", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(1), getter2.get("A"));
+            assertEquals(new Integer(1), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("01", getter4.get("A"));
+            assertEquals("01", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            assertEquals("02", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(2), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertEquals(new Integer(2), getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("02", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", "03");
+
+            assertEquals("03", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(3), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("03", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.process(topic1, "A", null);
+
+            assertNull(getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertNull(getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertNull(getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
new file mode 100644
index 0000000..97aca3d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableSourceTest {
+
+    private final Serializer<String> strSerializer = new StringSerializer();
+    private final Deserializer<String> strDeserializer = new StringDeserializer();
+
+    @Test
+    public void testKTable() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+
+        MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
+        table1.toStream().process(proc1);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+        driver.process(topic1, "A", 1);
+        driver.process(topic1, "B", 2);
+        driver.process(topic1, "C", 3);
+        driver.process(topic1, "D", 4);
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
+
+        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), proc1.processed);
+    }
+
+    @Test
+    public void testValueGetter() throws IOException {
+        File stateDir = Files.createTempDirectory("test").toFile();
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+
+            String topic1 = "topic1";
+
+            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>)
+                    builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+
+            KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+            getter1.init(driver.context());
+
+            driver.process(topic1, "A", "01");
+            driver.process(topic1, "B", "01");
+            driver.process(topic1, "C", "01");
+
+            assertEquals("01", getter1.get("A"));
+            assertEquals("01", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            driver.process(topic1, "A", "02");
+            driver.process(topic1, "B", "02");
+
+            assertEquals("02", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            driver.process(topic1, "A", "03");
+
+            assertEquals("03", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            driver.process(topic1, "A", null);
+            driver.process(topic1, "B", null);
+
+            assertNull(getter1.get("A"));
+            assertNull(getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+        } finally {
+            Utils.delete(stateDir);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
index aa484fc..43ffa7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
@@ -61,6 +61,8 @@ public class KafkaStreamingPartitionAssignorTest {
     private TopicPartition t2p2 = new TopicPartition("topic2", 2);
     private TopicPartition t2p3 = new TopicPartition("topic2", 3);
 
+    private Set<String> allTopics = Utils.mkSet("topic1", "topic2");
+
     private List<PartitionInfo> infos = Arrays.asList(
             new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
             new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
@@ -203,47 +205,26 @@ public class KafkaStreamingPartitionAssignorTest {
         assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
 
         // check assignment info
-        Set<TaskId> allActiveTasks = new HashSet<>();
-        AssignmentInfo info;
-
-        List<TaskId> activeTasks = new ArrayList<>();
-        for (TopicPartition partition : assignments.get("consumer10").partitions()) {
-            activeTasks.add(new TaskId(0, partition.partition()));
-        }
-        info = AssignmentInfo.decode(assignments.get("consumer10").userData());
-        assertEquals(activeTasks, info.activeTasks);
-        assertEquals(2, info.activeTasks.size());
-        assertEquals(1, new HashSet<>(info.activeTasks).size());
-        assertEquals(0, info.standbyTasks.size());
 
-        allActiveTasks.addAll(info.activeTasks);
+        Set<TaskId> allActiveTasks = new HashSet<>();
 
-        activeTasks.clear();
-        for (TopicPartition partition : assignments.get("consumer11").partitions()) {
-            activeTasks.add(new TaskId(0, partition.partition()));
-        }
-        info = AssignmentInfo.decode(assignments.get("consumer11").userData());
-        assertEquals(activeTasks, info.activeTasks);
-        assertEquals(2, info.activeTasks.size());
-        assertEquals(1, new HashSet<>(info.activeTasks).size());
-        assertEquals(0, info.standbyTasks.size());
+        // the first consumer
+        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+        allActiveTasks.addAll(info10.activeTasks);
 
-        allActiveTasks.addAll(info.activeTasks);
+        // the second consumer
+        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+        allActiveTasks.addAll(info11.activeTasks);
 
         // check active tasks assigned to the first client
         assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
 
-        activeTasks.clear();
-        for (TopicPartition partition : assignments.get("consumer20").partitions()) {
-            activeTasks.add(new TaskId(0, partition.partition()));
-        }
-        info = AssignmentInfo.decode(assignments.get("consumer20").userData());
-        assertEquals(activeTasks, info.activeTasks);
-        assertEquals(2, info.activeTasks.size());
-        assertEquals(1, new HashSet<>(info.activeTasks).size());
-        assertEquals(0, info.standbyTasks.size());
+        // the third consumer
+        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+        allActiveTasks.addAll(info20.activeTasks);
 
-        allActiveTasks.addAll(info.activeTasks);
+        assertEquals(3, allActiveTasks.size());
+        assertEquals(allTasks, new HashSet<>(allActiveTasks));
 
         assertEquals(3, allActiveTasks.size());
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -266,6 +247,7 @@ public class KafkaStreamingPartitionAssignorTest {
         List<String> topics = Utils.mkList("topic1", "topic2");
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
+
         final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
         final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
         final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
@@ -291,55 +273,29 @@ public class KafkaStreamingPartitionAssignorTest {
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
-        // check assigned partitions
-
-        assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
-                Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
-        assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
-
-        // check assignment info
         Set<TaskId> allActiveTasks = new HashSet<>();
         Set<TaskId> allStandbyTasks = new HashSet<>();
-        AssignmentInfo info;
 
-        List<TaskId> activeTasks = new ArrayList<>();
-        for (TopicPartition partition : assignments.get("consumer10").partitions()) {
-            activeTasks.add(new TaskId(0, partition.partition()));
-        }
-        info = AssignmentInfo.decode(assignments.get("consumer10").userData());
-        assertEquals(activeTasks, info.activeTasks);
-        assertEquals(2, info.activeTasks.size());
-        assertEquals(1, new HashSet<>(info.activeTasks).size());
+        // the first consumer
+        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+        allActiveTasks.addAll(info10.activeTasks);
+        allStandbyTasks.addAll(info10.standbyTasks.keySet());
 
-        allActiveTasks.addAll(info.activeTasks);
-        allStandbyTasks.addAll(info.standbyTasks);
+        // the second consumer
+        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+        allActiveTasks.addAll(info11.activeTasks);
+        allStandbyTasks.addAll(info11.standbyTasks.keySet());
 
-        activeTasks.clear();
-        for (TopicPartition partition : assignments.get("consumer11").partitions()) {
-            activeTasks.add(new TaskId(0, partition.partition()));
-        }
-        info = AssignmentInfo.decode(assignments.get("consumer11").userData());
-        assertEquals(activeTasks, info.activeTasks);
-        assertEquals(2, info.activeTasks.size());
-        assertEquals(1, new HashSet<>(info.activeTasks).size());
-
-        allActiveTasks.addAll(info.activeTasks);
-        allStandbyTasks.addAll(info.standbyTasks);
-
-        // check tasks assigned to the first client
+        // check active tasks assigned to the first client
         assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
+        assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
 
-        activeTasks.clear();
-        for (TopicPartition partition : assignments.get("consumer20").partitions()) {
-            activeTasks.add(new TaskId(0, partition.partition()));
-        }
-        info = AssignmentInfo.decode(assignments.get("consumer20").userData());
-        assertEquals(activeTasks, info.activeTasks);
-        assertEquals(2, info.activeTasks.size());
-        assertEquals(1, new HashSet<>(info.activeTasks).size());
+        // the third consumer
+        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+        allActiveTasks.addAll(info20.activeTasks);
+        allStandbyTasks.addAll(info20.standbyTasks.keySet());
 
-        allActiveTasks.addAll(info.activeTasks);
-        allStandbyTasks.addAll(info.standbyTasks);
+        // all task ids are in the active tasks and also in the standby tasks
 
         assertEquals(3, allActiveTasks.size());
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -348,6 +304,48 @@ public class KafkaStreamingPartitionAssignorTest {
         assertEquals(allTasks, new HashSet<>(allStandbyTasks));
     }
 
+    private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
+
+        // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
+
+        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+        // check if the number of assigned partitions == the size of active task id list
+        assertEquals(assignment.partitions().size(), info.activeTasks.size());
+
+        // check if active tasks are consistent
+        List<TaskId> activeTasks = new ArrayList<>();
+        Set<String> activeTopics = new HashSet<>();
+        for (TopicPartition partition : assignment.partitions()) {
+            // since default grouper, taskid.partition == partition.partition()
+            activeTasks.add(new TaskId(0, partition.partition()));
+            activeTopics.add(partition.topic());
+        }
+        assertEquals(activeTasks, info.activeTasks);
+
+        // check if active partitions cover all topics
+        assertEquals(allTopics, activeTopics);
+
+        // check if standby tasks are consistent
+        Set<String> standbyTopics = new HashSet<>();
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
+            TaskId id = entry.getKey();
+            Set<TopicPartition> partitions = entry.getValue();
+            for (TopicPartition partition : partitions) {
+                // since default grouper, taskid.partition == partition.partition()
+                assertEquals(id.partition, partition.partition());
+
+                standbyTopics.add(partition.topic());
+            }
+        }
+
+        if (info.standbyTasks.size() > 0)
+            // check if standby partitions cover all topics
+            assertEquals(allTopics, standbyTopics);
+
+        return info;
+    }
+
     @Test
     public void testOnAssignment() throws Exception {
         StreamingConfig config = new StreamingConfig(configProps());
@@ -369,7 +367,10 @@ public class KafkaStreamingPartitionAssignorTest {
         partitionAssignor.configure(config.getConsumerConfigs(thread));
 
         List<TaskId> activeTaskList = Utils.mkList(task0, task3);
-        Set<TaskId> standbyTasks = Utils.mkSet(task1, task2);
+        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+        standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
+        standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
+
         AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks);
         PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
         partitionAssignor.onAssignment(assignment);

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index b8a6990..9a43e46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -41,6 +43,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 
@@ -53,6 +56,7 @@ public class StandbyTaskTest {
     private final TopicPartition partition1 = new TopicPartition("store1", 1);
     private final TopicPartition partition2 = new TopicPartition("store2", 1);
 
+    private final Set<TopicPartition> topicPartitions = Collections.emptySet();
     private final ProcessorTopology topology = new ProcessorTopology(
             Collections.<ProcessorNode>emptyList(),
             Collections.<String, SourceNode>emptyMap(),
@@ -78,6 +82,7 @@ public class StandbyTaskTest {
         });
     }
 
+    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final ProcessorStateManagerTest.MockRestoreConsumer restoreStateConsumer = new ProcessorStateManagerTest.MockRestoreConsumer();
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
@@ -104,7 +109,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+            StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
 
@@ -119,7 +124,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+            StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
@@ -138,7 +143,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+            StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a95c2fa..aae5a7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -103,7 +103,7 @@ public class StreamTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(0, 0), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
+            StreamTask task = new StreamTask(new TaskId(0, 0), partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
@@ -154,7 +154,7 @@ public class StreamTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamingConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(1, 1), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
+            StreamTask task = new StreamTask(new TaskId(1, 1), partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 02d0ac7..9f31450 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -126,13 +126,13 @@ public class StreamThreadTest {
         public boolean committed = false;
 
         public TestStreamTask(TaskId id,
+                              Collection<TopicPartition> partitions,
+                              ProcessorTopology topology,
                               Consumer<byte[], byte[]> consumer,
                               Producer<byte[], byte[]> producer,
                               Consumer<byte[], byte[]> restoreConsumer,
-                              Collection<TopicPartition> partitions,
-                              ProcessorTopology topology,
                               StreamingConfig config) {
-            super(id, consumer, producer, restoreConsumer, partitions, topology, config, null);
+            super(id, partitions, topology, consumer, producer, restoreConsumer, config, null);
         }
 
         @Override
@@ -163,7 +163,7 @@ public class StreamThreadTest {
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                 ProcessorTopology topology = builder.build(id.topicGroupId);
-                return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
+                return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
             }
         };
 
@@ -288,7 +288,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
+                    return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
                 }
             };
 
@@ -410,7 +410,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
+                    return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
                 }
             };
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
index 58e0af9..14a7f9a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.TaskId;
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -33,8 +36,10 @@ public class AssginmentInfoTest {
     public void testEncodeDecode() {
         List<TaskId> activeTasks =
                 Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
-        Set<TaskId> standbyTasks =
-                new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
+        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+        standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
+        standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
 
         AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks);
         AssignmentInfo decoded = AssignmentInfo.decode(info.encode());

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index ca5f33d..119f08f 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -17,28 +17,44 @@
 
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
 
+import java.io.File;
 import java.util.List;
 
 public class KStreamTestDriver {
 
     private final ProcessorTopology topology;
     private final MockProcessorContext context;
+    public final File stateDir;
+
     private ProcessorNode currNode;
 
     public KStreamTestDriver(KStreamBuilder builder) {
-        this(builder, null, null);
+        this(builder, null, null, null, null, null);
     }
 
-    public KStreamTestDriver(KStreamBuilder builder, Serializer<?> serializer, Deserializer<?> deserializer) {
+    public KStreamTestDriver(KStreamBuilder builder,
+                             File stateDir,
+                             Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
+                             Serializer<?> valSerializer, Deserializer<?> valDeserializer) {
         this.topology = builder.build(null);
-        this.context = new MockProcessorContext(this, serializer, deserializer);
+        this.context = new MockProcessorContext(this, keySerializer, keyDeserializer, valSerializer, valDeserializer, new MockRecordCollector());
+        this.stateDir = stateDir;
+
+        for (StateStoreSupplier stateStoreSupplier : topology.stateStoreSuppliers()) {
+            StateStore store = stateStoreSupplier.get();
+            store.init(context);
+        }
 
         for (ProcessorNode node : topology.processors()) {
             currNode = node;
@@ -50,6 +66,10 @@ public class KStreamTestDriver {
         }
     }
 
+    public ProcessorContext context() {
+        return context;
+    }
+
     public void process(String topicName, Object key, Object value) {
         currNode = topology.source(topicName);
         try {
@@ -92,4 +112,21 @@ public class KStreamTestDriver {
         }
     }
 
+    private class MockRecordCollector extends RecordCollector {
+        public MockRecordCollector() {
+            super(null);
+        }
+
+        public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+            // The serialization is skipped.
+            process(record.topic(), record.key(), record.value());
+        }
+
+        public void flush() {
+        }
+
+        public void close() {
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 40f11a0..81a9add 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.test;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -109,17 +110,24 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     @Override
     public File stateDir() {
-        throw new UnsupportedOperationException("stateDir() not supported.");
+        return driver.stateDir;
     }
 
     @Override
     public StreamingMetrics metrics() {
-        throw new UnsupportedOperationException("metrics() not supported.");
+        return new StreamingMetrics() {
+            @Override
+            public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
+                return null;
+            }
+            @Override
+            public void recordLatency(Sensor sensor, long startNs, long endNs) {
+            }
+        };
     }
 
     @Override
     public void register(StateStore store, StateRestoreCallback func) {
-        if (func != null) throw new UnsupportedOperationException("StateRestoreCallback not supported.");
         storeMap.put(store.name(), store);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 5f796c6..fdb4d57 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -158,11 +158,11 @@ public class ProcessorTopologyTestDriver {
         }
 
         task = new StreamTask(id,
+            partitionsByTopic.values(),
+            topology,
             consumer,
             producer,
             restoreStateConsumer,
-            partitionsByTopic.values(),
-            topology,
             config,
             new StreamingMetrics() {
                 @Override


[2/2] kafka git commit: KAFKA-2856: Add KTable non-stateful APIs along with standby task support

Posted by gu...@apache.org.
KAFKA-2856: Add KTable non-stateful APIs along with standby task support

guozhangwang
* added KTable API and impl
* added standby support for KTable

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #604 from ymatsuda/add_ktable


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39c3512e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39c3512e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39c3512e

Branch: refs/heads/trunk
Commit: 39c3512eceedebcb6e50f8c6c4ef66601ff7dbc4
Parents: cd54fc8
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Fri Dec 4 14:59:24 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Dec 4 14:59:24 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   |  22 +-
 .../kafka/streams/kstream/KStreamBuilder.java   |  41 +++
 .../apache/kafka/streams/kstream/KTable.java    | 108 ++++++++
 .../streams/kstream/internals/KStreamImpl.java  |   2 +-
 .../KTableDerivedValueGetterSupplier.java       |  28 ++
 .../streams/kstream/internals/KTableFilter.java |  87 ++++++
 .../streams/kstream/internals/KTableImpl.java   | 188 +++++++++++++
 .../kstream/internals/KTableMapValues.java      |  85 ++++++
 .../internals/KTableProcessorSupplier.java      |  26 ++
 .../streams/kstream/internals/KTableSource.java |  78 ++++++
 .../KTableSourceValueGetterSupplier.java        |  50 ++++
 .../kstream/internals/KTableStoreSupplier.java  |  58 ++++
 .../kstream/internals/KTableValueGetter.java    |  28 ++
 .../internals/KTableValueGetterSupplier.java    |  24 ++
 .../streams/processor/PartitionGrouper.java     |   2 +-
 .../apache/kafka/streams/processor/TaskId.java  |  12 +
 .../processor/internals/AbstractTask.java       |  14 +-
 .../KafkaStreamingPartitionAssignor.java        |  20 +-
 .../internals/ProcessorStateManager.java        |  49 +++-
 .../processor/internals/StandbyTask.java        |  25 +-
 .../streams/processor/internals/StreamTask.java |  18 +-
 .../processor/internals/StreamThread.java       |  78 ++++--
 .../internals/assignment/AssignmentInfo.java    | 129 +++++----
 .../state/KeyValueStoreChangeLogger.java        |  87 ++++++
 .../streams/state/MeteredKeyValueStore.java     |  95 +++----
 .../state/RocksDBKeyValueStoreSupplier.java     | 252 +-----------------
 .../kafka/streams/state/RocksDBStore.java       | 265 +++++++++++++++++++
 .../org/apache/kafka/streams/state/Serdes.java  |   3 +-
 .../kstream/internals/KTableFilterTest.java     | 137 ++++++++++
 .../kstream/internals/KTableImplTest.java       | 220 +++++++++++++++
 .../internals/KTableMapValuesImplTest.java      | 198 ++++++++++++++
 .../kstream/internals/KTableSourceTest.java     | 117 ++++++++
 .../KafkaStreamingPartitionAssignorTest.java    | 151 +++++------
 .../processor/internals/StandbyTaskTest.java    |  11 +-
 .../processor/internals/StreamTaskTest.java     |   4 +-
 .../processor/internals/StreamThreadTest.java   |  12 +-
 .../assignment/AssginmentInfoTest.java          |  11 +-
 .../apache/kafka/test/KStreamTestDriver.java    |  43 ++-
 .../apache/kafka/test/MockProcessorContext.java |  14 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   4 +-
 40 files changed, 2268 insertions(+), 528 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 992bd75..93303eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -33,7 +33,7 @@ public interface KStream<K, V> {
      * Creates a new instance of KStream consists of all elements of this stream which satisfy a predicate
      *
      * @param predicate the instance of Predicate
-     * @return the stream with only those elements that satisfy the predicate
+     * @return the instance of KStream with only those elements that satisfy the predicate
      */
     KStream<K, V> filter(Predicate<K, V> predicate);
 
@@ -41,22 +41,22 @@ public interface KStream<K, V> {
      * Creates a new instance of KStream consists all elements of this stream which do not satisfy a predicate
      *
      * @param predicate the instance of Predicate
-     * @return the stream with only those elements that do not satisfy the predicate
+     * @return the instance of KStream with only those elements that do not satisfy the predicate
      */
     KStream<K, V> filterOut(Predicate<K, V> predicate);
 
     /**
-     * Creates a new stream by applying transforming each element in this stream into a different element in the new stream.
+     * Creates a new instance of KStream by applying transforming each element in this stream into a different element in the new stream.
      *
      * @param mapper the instance of KeyValueMapper
      * @param <K1>   the key type of the new stream
      * @param <V1>   the value type of the new stream
-     * @return the mapped stream
+     * @return the instance of KStream
      */
     <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper);
 
     /**
-     * Creates a new instance of KStream by applying transforming each value in this stream into a different value in the new stream.
+     * Creates a new instance of KStream by transforming each value in this stream into a different value in the new stream.
      *
      * @param mapper the instance of ValueMapper
      * @param <V1>   the value type of the new stream
@@ -65,7 +65,7 @@ public interface KStream<K, V> {
     <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
     /**
-     * Creates a new instance of KStream by applying transforming each element in this stream into zero or more elements in the new stream.
+     * Creates a new instance of KStream by transforming each element in this stream into zero or more elements in the new stream.
      *
      * @param mapper the instance of KeyValueMapper
      * @param <K1>   the key type of the new stream
@@ -75,7 +75,7 @@ public interface KStream<K, V> {
     <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper);
 
     /**
-     * Creates a new instance of KStream by applying transforming each value in this stream into zero or more values in the new stream.
+     * Creates a new stream by transforming each value in this stream into zero or more values in the new stream.
      *
      * @param processor the instance of Processor
      * @param <V1>      the value type of the new stream
@@ -103,11 +103,11 @@ public interface KStream<K, V> {
     KStream<K, V>[] branch(Predicate<K, V>... predicates);
 
     /**
-     * Sends key-value to a topic, also creates a new stream from the topic.
+     * Sends key-value to a topic, also creates a new instance of KStream from the topic.
      * This is equivalent to calling to(topic) and from(topic).
      *
      * @param topic           the topic name
-     * @return the new stream that consumes the given topic
+     * @return the instance of KStream that consumes the given topic
      */
     KStream<K, V> through(String topic);
 
@@ -124,7 +124,7 @@ public interface KStream<K, V> {
      *                        if not specified the default key deserializer defined in the configuration will be used
      * @param valDeserializer value deserializer used to create the new KStream,
      *                        if not specified the default value deserializer defined in the configuration will be used
-     * @return the new stream that consumes the given topic
+     * @return the instance of KStream that consumes the given topic
      */
     KStream<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer);
 
@@ -160,7 +160,7 @@ public interface KStream<K, V> {
      *
      * @param valueTransformerSupplier the class of TransformerDef
      * @param stateStoreNames the names of the state store used by the processor
-     * @return the instance of KStream that contains transformed keys and values
+     * @return the instance of KStream that contains the keys and transformed values
      */
     <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index ae8f694..ca1a10d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -18,7 +18,11 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
+import org.apache.kafka.streams.kstream.internals.KTableImpl;
+import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.KTableSource;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 
 import java.util.Collections;
@@ -65,6 +69,43 @@ public class KStreamBuilder extends TopologyBuilder {
     }
 
     /**
+     * Creates a KTable instance for the specified topic.
+     * The default deserializers specified in the config are used.
+     *
+     * @param topic          the topic name
+     * @return KTable
+     */
+    public <K, V> KTable<K, V> table(String topic) {
+        return table(null, null, null, null, topic);
+    }
+
+    /**
+     * Creates a KTable instance for the specified topic.
+     *
+     * @param keySerializer   key serializer used to send key-value pairs,
+     *                        if not specified the default key serializer defined in the configuration will be used
+     * @param valSerializer   value serializer used to send key-value pairs,
+     *                        if not specified the default value serializer defined in the configuration will be used
+     * @param keyDeserializer key deserializer used to read this source KStream,
+     *                        if not specified the default deserializer defined in the configs will be used
+     * @param valDeserializer value deserializer used to read this source KStream,
+     *                        if not specified the default deserializer defined in the configs will be used
+     * @param topic          the topic name
+     * @return KStream
+     */
+    public <K, V> KTable<K, V> table(Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String topic) {
+        String source = newName(KStreamImpl.SOURCE_NAME);
+        String name = newName(KTableImpl.SOURCE_NAME);
+
+        addSource(source, keyDeserializer, valDeserializer, topic);
+
+        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableSource<>(topic);
+        addProcessor(name, processorSupplier, source);
+
+        return new KTableImpl<>(this, name, processorSupplier, source, topic, keySerializer, valSerializer, keyDeserializer, valDeserializer);
+    }
+
+    /**
      * Creates a new stream by merging the given streams
      *
      * @param streams the streams to be merged

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
new file mode 100644
index 0000000..75fb87a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * KTable is an abstraction of a change log stream.
+ *
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ */
+public interface KTable<K, V> {
+
+    /**
+     * Creates a new instance of KTable consists of all elements of this stream which satisfy a predicate
+     *
+     * @param predicate the instance of Predicate
+     * @return the instance of KTable with only those elements that satisfy the predicate
+     */
+    KTable<K, V> filter(Predicate<K, V> predicate);
+
+    /**
+     * Creates a new instance of KTable consists all elements of this stream which do not satisfy a predicate
+     *
+     * @param predicate the instance of Predicate
+     * @return the instance of KTable with only those elements that do not satisfy the predicate
+     */
+    KTable<K, V> filterOut(Predicate<K, V> predicate);
+
+    /**
+     * Creates a new instance of KTable by transforming each value in this stream into a different value in the new stream.
+     *
+     * @param mapper the instance of ValueMapper
+     * @param <V1>   the value type of the new stream
+     * @return the instance of KTable
+     */
+    <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper);
+
+    /**
+     * Sends key-value to a topic, also creates a new instance of KTable from the topic.
+     * This is equivalent to calling to(topic) and table(topic).
+     *
+     * @param topic           the topic name
+     * @return the instance of KTable that consumes the given topic
+     */
+    KTable<K, V> through(String topic);
+
+    /**
+     * Sends key-value to a topic, also creates a new instance of KTable from the topic.
+     * This is equivalent to calling to(topic) and table(topic).
+     *
+     * @param topic           the topic name
+     * @param keySerializer   key serializer used to send key-value pairs,
+     *                        if not specified the default key serializer defined in the configuration will be used
+     * @param valSerializer   value serializer used to send key-value pairs,
+     *                        if not specified the default value serializer defined in the configuration will be used
+     * @param keyDeserializer key deserializer used to create the new KStream,
+     *                        if not specified the default key deserializer defined in the configuration will be used
+     * @param valDeserializer value deserializer used to create the new KStream,
+     *                        if not specified the default value deserializer defined in the configuration will be used
+     * @return the new stream that consumes the given topic
+     */
+    KTable<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer);
+
+    /**
+     * Sends key-value to a topic using default serializers specified in the config.
+     *
+     * @param topic         the topic name
+     */
+    void to(String topic);
+
+    /**
+     * Sends key-value to a topic.
+     *
+     * @param topic         the topic name
+     * @param keySerializer key serializer used to send key-value pairs,
+     *                      if not specified the default serializer defined in the configs will be used
+     * @param valSerializer value serializer used to send key-value pairs,
+     *                      if not specified the default serializer defined in the configs will be used
+     */
+    void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer);
+
+    /**
+     * Creates a new instance of KStream from this KTable
+     *
+     * @return the instance of KStream
+     */
+    KStream<K, V> toStream();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 04aa8e9..fc8f4c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -59,7 +59,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
 
-    private static final String SINK_NAME = "KSTREAM-SINK-";
+    public static final String SINK_NAME = "KSTREAM-SINK-";
 
     public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
new file mode 100644
index 0000000..731d7f7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+public abstract class KTableDerivedValueGetterSupplier<K, V1, V2> implements KTableValueGetterSupplier<K, V2> {
+
+    protected final KTableValueGetterSupplier<K, V1> parentValueGetterSupplier;
+
+    public KTableDerivedValueGetterSupplier(KTableValueGetterSupplier<K, V1> parentValueGetterSupplier) {
+        this.parentValueGetterSupplier = parentValueGetterSupplier;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
new file mode 100644
index 0000000..212b1c9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableFilter<K, V> extends KTableProcessorSupplier<K, V, V> {
+
+    private final Predicate<K, V> predicate;
+    private final boolean filterOut;
+
+    public KTableFilter(Predicate<K, V> predicate, boolean filterOut) {
+        this.predicate = predicate;
+        this.filterOut = filterOut;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KTableFilterProcessor();
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V> view(KTableValueGetterSupplier<K, V> parentValueGetterSupplier) {
+        return new KTableDerivedValueGetterSupplier<K, V, V>(parentValueGetterSupplier) {
+
+            public KTableValueGetter<K, V> get() {
+                return new KTableFilterValueGetter(parentValueGetterSupplier.get());
+            }
+
+        };
+    }
+
+    private V computeNewValue(K key, V value) {
+        V newValue = null;
+
+        if (value != null && (filterOut ^ predicate.test(key, value)))
+            newValue = value;
+
+        return newValue;
+    }
+
+    private class KTableFilterProcessor extends AbstractProcessor<K, V> {
+
+        @Override
+        public void process(K key, V value) {
+            context().forward(key, computeNewValue(key, value));
+        }
+
+    }
+
+    private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
+
+        private final KTableValueGetter<K, V> parentGetter;
+
+        public KTableFilterValueGetter(KTableValueGetter<K, V> parentGetter) {
+            this.parentGetter = parentGetter;
+        }
+
+        public void init(ProcessorContext context) {
+            parentGetter.init(context);
+        }
+
+        public V get(K key) {
+            return computeNewValue(key, parentGetter.get(key));
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
new file mode 100644
index 0000000..5b2b031
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Collections;
+
+/**
+ * The implementation class of KTable
+ * @param <K> the key type
+ * @param <S> the source's (parent's) value type
+ * @param <V> the value type
+ */
+public class KTableImpl<K, S, V> implements KTable<K, V> {
+
+    private static final String FILTER_NAME = "KTABLE-FILTER-";
+
+    private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
+
+    private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
+
+    public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+
+    protected final KStreamBuilder topology;
+    public final String name;
+    public final KTableProcessorSupplier<K, S, V> processorSupplier;
+    private final String sourceNode;
+
+    private final KTableImpl<K, ?, S> parent;
+    private final String topic;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valSerializer;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valDeserializer;
+
+    public KTableImpl(KStreamBuilder topology,
+                      String name,
+                      KTableProcessorSupplier<K, S, V> processorSupplier,
+                      String sourceNode,
+                      KTableImpl<K, ?, S> parent) {
+        this(topology, name, processorSupplier, sourceNode, null, null, null, null, null, parent);
+    }
+
+    public KTableImpl(KStreamBuilder topology,
+                      String name,
+                      KTableProcessorSupplier<K, S, V> processorSupplier,
+                      String sourceNode,
+                      String topic,
+                      Serializer<K> keySerializer,
+                      Serializer<V> valSerializer,
+                      Deserializer<K> keyDeserializer,
+                      Deserializer<V> valDeserializer) {
+        this(topology, name, processorSupplier, sourceNode, topic, keySerializer, valSerializer, keyDeserializer, valDeserializer, null);
+    }
+
+    private KTableImpl(KStreamBuilder topology,
+                       String name,
+                       KTableProcessorSupplier<K, S, V> processorSupplier,
+                       String sourceNode,
+                       String topic,
+                       Serializer<K> keySerializer,
+                       Serializer<V> valSerializer,
+                       Deserializer<K> keyDeserializer,
+                       Deserializer<V> valDeserializer,
+                       KTableImpl<K, ?, S> parent) {
+        this.topology = topology;
+        this.name = name;
+        this.processorSupplier = processorSupplier;
+        this.sourceNode = sourceNode;
+        this.topic = topic;
+        this.keySerializer = keySerializer;
+        this.valSerializer = valSerializer;
+        this.keyDeserializer = keyDeserializer;
+        this.valDeserializer = valDeserializer;
+        this.parent = parent;
+    }
+
+    @Override
+    public KTable<K, V> filter(Predicate<K, V> predicate) {
+        String name = topology.newName(FILTER_NAME);
+        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(predicate, false);
+        topology.addProcessor(name, processorSupplier, this.name);
+
+        return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+    }
+
+    @Override
+    public KTable<K, V> filterOut(final Predicate<K, V> predicate) {
+        String name = topology.newName(FILTER_NAME);
+        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(predicate, true);
+
+        topology.addProcessor(name, processorSupplier, this.name);
+
+        return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+    }
+
+    @Override
+    public <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper) {
+        String name = topology.newName(MAPVALUES_NAME);
+        KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(mapper);
+
+        topology.addProcessor(name, processorSupplier, this.name);
+
+        return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+    }
+
+    @Override
+    public KTable<K, V> through(String topic,
+                                Serializer<K> keySerializer,
+                                Serializer<V> valSerializer,
+                                Deserializer<K> keyDeserializer,
+                                Deserializer<V> valDeserializer) {
+        String sendName = topology.newName(KStreamImpl.SINK_NAME);
+
+        topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
+
+        return topology.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic);
+    }
+
+    @Override
+    public KTable<K, V> through(String topic) {
+        return through(topic, null, null, null, null);
+    }
+
+    @Override
+    public void to(String topic) {
+        String name = topology.newName(KStreamImpl.SINK_NAME);
+
+        topology.addSink(name, topic, this.name);
+    }
+
+    @Override
+    public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
+        String name = topology.newName(KStreamImpl.SINK_NAME);
+
+        topology.addSink(name, topic, keySerializer, valSerializer, this.name);
+    }
+
+    @Override
+    public KStream<K, V> toStream() {
+        String name = topology.newName(TOSTREAM_NAME);
+
+        topology.addProcessor(name, new KStreamPassThrough(), this.name);
+
+        return new KStreamImpl<>(topology, name, Collections.singleton(sourceNode));
+    }
+
+    KTableValueGetterSupplier<K, V> valueGetterSupplier() {
+        if (parent != null) {
+            return processorSupplier.view(parent.valueGetterSupplier());
+        } else {
+            KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
+            synchronized (source) {
+                if (!source.isMaterialized()) {
+                    StateStoreSupplier storeSupplier =
+                            new KTableStoreSupplier(topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
+                    topology.addStateStore(storeSupplier, name);
+                    source.materialize();
+                }
+            }
+            return new KTableSourceValueGetterSupplier<>(topic);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
new file mode 100644
index 0000000..0d14390
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableMapValues<K1, V1, V2> extends KTableProcessorSupplier<K1, V1, V2> {
+
+    private final ValueMapper<V1, V2> mapper;
+
+    public KTableMapValues(ValueMapper<V1, V2> mapper) {
+        this.mapper = mapper;
+    }
+
+    @Override
+    public Processor<K1, V1> get() {
+        return new KTableMapProcessor();
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K1, V2> view(KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier) {
+        return new KTableDerivedValueGetterSupplier<K1, V1, V2>(parentValueGetterSupplier) {
+
+            public KTableValueGetter<K1, V2> get() {
+                return new KTableMapValuesValueGetter(parentValueGetterSupplier.get());
+            }
+
+        };
+    }
+
+    private V2 computeNewValue(V1 value) {
+        V2 newValue = null;
+
+        if (value != null)
+            newValue = mapper.apply(value);
+
+        return newValue;
+    }
+
+    private class KTableMapProcessor extends AbstractProcessor<K1, V1> {
+
+        @Override
+        public void process(K1 key, V1 value) {
+            context().forward(key, computeNewValue(value));
+        }
+
+    }
+
+    private class KTableMapValuesValueGetter implements KTableValueGetter<K1, V2> {
+
+        private final KTableValueGetter<K1, V1> parentGetter;
+
+        public KTableMapValuesValueGetter(KTableValueGetter<K1, V1> parentGetter) {
+            this.parentGetter = parentGetter;
+        }
+
+        public void init(ProcessorContext context) {
+            parentGetter.init(context);
+        }
+
+        public V2 get(K1 key) {
+            return computeNewValue(parentGetter.get(key));
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
new file mode 100644
index 0000000..cc6467f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public abstract class KTableProcessorSupplier<K, V, T> implements ProcessorSupplier<K, V> {
+
+    public abstract KTableValueGetterSupplier<K, T> view(KTableValueGetterSupplier<K, V> parentValueGetterFactory);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
new file mode 100644
index 0000000..93790ed
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableSource<K, V> extends KTableProcessorSupplier<K, V, V> {
+
+    private final String topic;
+
+    private boolean materialized = false;
+
+    public KTableSource(String topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return materialized ? new MaterializedKTableSourceProcessor() : new KTableSourceProcessor();
+    }
+
+    public void materialize() {
+        materialized = true;
+    }
+
+    public boolean isMaterialized() {
+        return materialized;
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V> view(KTableValueGetterSupplier<K, V> parentValueGetterSupplier) {
+        throw new IllegalStateException("a view cannot be define on the ktable source");
+    }
+
+    private class KTableSourceProcessor extends AbstractProcessor<K, V> {
+        @Override
+        public void process(K key, V value) {
+            context().forward(key, value);
+        }
+    }
+
+    private class MaterializedKTableSourceProcessor extends AbstractProcessor<K, V> {
+
+        private KeyValueStore<K, V> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+            store = (KeyValueStore<K, V>) context.getStateStore(topic);
+        }
+
+        @Override
+        public void process(K key, V value) {
+            store.put(key, value);
+            context().forward(key, value);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
new file mode 100644
index 0000000..dab92d5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
+
+    private final String topic;
+
+    public KTableSourceValueGetterSupplier(String topic) {
+        this.topic = topic;
+    }
+
+    public KTableValueGetter<K, V> get() {
+        return new KTableSourceValueGetter();
+    }
+
+    private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
+
+        KeyValueStore<K, V> store = null;
+
+        @SuppressWarnings("unchecked")
+        public void init(ProcessorContext context) {
+            store = (KeyValueStore<K, V>) context.getStateStore(topic);
+        }
+
+        public V get(K key) {
+            return store.get(key);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
new file mode 100644
index 0000000..d07fc5d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.MeteredKeyValueStore;
+import org.apache.kafka.streams.state.RocksDBStore;
+import org.apache.kafka.streams.state.Serdes;
+
+/**
+ * A KTable storage. It stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ */
+public class KTableStoreSupplier<K, V> implements StateStoreSupplier {
+
+    private final String name;
+    private final Serdes<K, V> serdes;
+    private final Time time;
+
+    protected KTableStoreSupplier(String name,
+                                  Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
+                                  Serializer<V> valSerializer, Deserializer<V> valDeserializer,
+                                  Time time) {
+        this.name = name;
+        this.serdes = new Serdes<>(name, keySerializer, keyDeserializer, valSerializer, valDeserializer);
+        this.time = time;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public StateStore get() {
+        return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes), serdes, "rocksdb-state", time).disableLogging();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
new file mode 100644
index 0000000..53ec6ba
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+public interface KTableValueGetter<K, V> {
+
+    void init(ProcessorContext context);
+
+    V get(K key);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
new file mode 100644
index 0000000..1ab6ba6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+public interface KTableValueGetterSupplier<K, V> {
+
+    KTableValueGetter<K, V> get();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index 00b56b3..187c4ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -50,7 +50,7 @@ public abstract class PartitionGrouper {
         return partitionAssignor.taskIds(partition);
     }
 
-    public Set<TaskId> standbyTasks() {
+    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
         return partitionAssignor.standbyTasks();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index 5344f6c..023bbbb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.streams.processor;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class TaskId implements Comparable<TaskId> {
@@ -47,6 +50,15 @@ public class TaskId implements Comparable<TaskId> {
         }
     }
 
+    public void writeTo(DataOutputStream out) throws IOException {
+        out.writeInt(topicGroupId);
+        out.writeInt(partition);
+    }
+
+    public static TaskId readFrom(DataInputStream in) throws IOException {
+        return new TaskId(in.readInt(), in.readInt());
+    }
+
     public void writeTo(ByteBuffer buf) {
         buf.putInt(topicGroupId);
         buf.putInt(partition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 14037ab..e1b4d62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -28,31 +28,37 @@ import org.apache.kafka.streams.processor.TaskId;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 public abstract class AbstractTask {
     protected final TaskId id;
     protected final ProcessorTopology topology;
+    protected final Consumer consumer;
     protected final ProcessorStateManager stateMgr;
     protected final Set<TopicPartition> partitions;
     protected ProcessorContext processorContext;
 
     protected AbstractTask(TaskId id,
-                           Consumer<byte[], byte[]> restoreConsumer,
+                           Collection<TopicPartition> partitions,
                            ProcessorTopology topology,
+                           Consumer<byte[], byte[]> consumer,
+                           Consumer<byte[], byte[]> restoreConsumer,
                            StreamingConfig config,
-                           Set<TopicPartition> partitions) {
+                           boolean isStandby) {
         this.id = id;
+        this.partitions = new HashSet<>(partitions);
         this.topology = topology;
-        this.partitions = partitions;
+        this.consumer = consumer;
 
         // create the processor state manager
         try {
             File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
             // if partitions is null, this is a standby task
-            this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, partitions == null);
+            this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, isStandby);
         } catch (IOException e) {
             throw new KafkaException("Error while creating the state manager", e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
index 451b214..54d5567 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
@@ -48,7 +48,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
     private StreamThread streamThread;
     private int numStandbyReplicas;
     private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
-    private Set<TaskId> standbyTasks;
+    private Map<TaskId, Set<TopicPartition>> standbyTasks;
 
     @Override
     public void configure(Map<String, ?> configs) {
@@ -154,28 +154,32 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
 
             final int numConsumers = consumers.size();
             List<TaskId> active = new ArrayList<>();
-            Set<TaskId> standby = new HashSet<>();
+            Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
 
             int i = 0;
             for (String consumer : consumers) {
-                List<TopicPartition> partitions = new ArrayList<>();
+                List<TopicPartition> activePartitions = new ArrayList<>();
 
                 final int numTaskIds = taskIds.size();
                 for (int j = i; j < numTaskIds; j += numConsumers) {
                     TaskId taskId = taskIds.get(j);
                     if (j < numActiveTasks) {
                         for (TopicPartition partition : partitionGroups.get(taskId)) {
-                            partitions.add(partition);
+                            activePartitions.add(partition);
                             active.add(taskId);
                         }
                     } else {
-                        // no partition to a standby task
-                        standby.add(taskId);
+                        Set<TopicPartition> standbyPartitions = standby.get(taskId);
+                        if (standbyPartitions == null) {
+                            standbyPartitions = new HashSet<>();
+                            standby.put(taskId, standbyPartitions);
+                        }
+                        standbyPartitions.addAll(partitionGroups.get(taskId));
                     }
                 }
 
                 AssignmentInfo data = new AssignmentInfo(active, standby);
-                assignment.put(consumer, new Assignment(partitions, data.encode()));
+                assignment.put(consumer, new Assignment(activePartitions, data.encode()));
                 i++;
 
                 active.clear();
@@ -220,7 +224,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
         return partitionToTaskIds.get(partition);
     }
 
-    public Set<TaskId> standbyTasks() {
+    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
         return standbyTasks;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 2a8df9e..4cff02d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -33,6 +33,7 @@ import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -52,6 +53,7 @@ public class ProcessorStateManager {
     private final Consumer<byte[], byte[]> restoreConsumer;
     private final Map<TopicPartition, Long> restoredOffsets;
     private final Map<TopicPartition, Long> checkpointedOffsets;
+    private final Map<TopicPartition, Long> offsetLimits;
     private final boolean isStandby;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks
 
@@ -63,6 +65,7 @@ public class ProcessorStateManager {
         this.restoredOffsets = new HashMap<>();
         this.isStandby = isStandby;
         this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
+        this.offsetLimits = new HashMap<>();
 
         // create the state directory for this task if missing (we won't create the parent directory)
         createStateDirectory(baseDir);
@@ -165,8 +168,10 @@ public class ProcessorStateManager {
 
             // restore its state from changelog records; while restoring the log end offset
             // should not change since it is only written by this thread.
+            long limit = offsetLimit(storePartition);
             while (true) {
                 for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
+                    if (record.offset() >= limit) break;
                     stateRestoreCallback.restore(record.key(), record.value());
                 }
 
@@ -178,7 +183,7 @@ public class ProcessorStateManager {
             }
 
             // record the restored offset for its change log partition
-            long newOffset = restoreConsumer.position(storePartition);
+            long newOffset = Math.min(limit, restoreConsumer.position(storePartition));
             restoredOffsets.put(storePartition, newOffset);
         } finally {
             // un-assign the change log partition
@@ -202,16 +207,40 @@ public class ProcessorStateManager {
         return partitionsAndOffsets;
     }
 
-    public void updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
+    public List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
+        long limit = offsetLimit(storePartition);
+        List<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
+
         // restore states from changelog records
         StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());
 
+        long lastOffset = -1L;
+        int count = 0;
         for (ConsumerRecord<byte[], byte[]> record : records) {
-            restoreCallback.restore(record.key(), record.value());
+            if (record.offset() < limit) {
+                restoreCallback.restore(record.key(), record.value());
+                lastOffset = record.offset();
+            } else {
+                if (remainingRecords == null)
+                    remainingRecords = new ArrayList<>(records.size() - count);
+
+                remainingRecords.add(record);
+            }
+            count++;
         }
         // record the restored offset for its change log partition
-        long newOffset = restoreConsumer.position(storePartition);
-        restoredOffsets.put(storePartition, newOffset);
+        restoredOffsets.put(storePartition, lastOffset + 1);
+
+        return remainingRecords;
+    }
+
+    public void putOffsetLimit(TopicPartition partition, long limit) {
+        offsetLimits.put(partition, limit);
+    }
+
+    private long offsetLimit(TopicPartition partition) {
+        Long limit = offsetLimits.get(partition);
+        return limit != null ? limit : Long.MAX_VALUE;
     }
 
     public StateStore getStore(String name) {
@@ -253,14 +282,14 @@ public class ProcessorStateManager {
                 if (stores.get(storeName).persistent()) {
                     Long offset = ackedOffsets.get(part);
 
-                    if (offset == null) {
-                        // if no record was produced. we need to check the restored offset.
-                        offset = restoredOffsets.get(part);
-                    }
-
                     if (offset != null) {
                         // store the last offset + 1 (the log position after restoration)
                         checkpointOffsets.put(part, offset + 1);
+                    } else {
+                        // if no record was produced. we need to check the restored offset.
+                        offset = restoredOffsets.get(part);
+                        if (offset != null)
+                            checkpointOffsets.put(part, offset);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index c6442d9..d0d8493 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.StreamingMetrics;
@@ -50,11 +51,13 @@ public class StandbyTask extends AbstractTask {
      * @param metrics               the {@link StreamingMetrics} created by the thread
      */
     public StandbyTask(TaskId id,
-                       Consumer<byte[], byte[]> restoreConsumer,
+                       Collection<TopicPartition> partitions,
                        ProcessorTopology topology,
+                       Consumer<byte[], byte[]> consumer,
+                       Consumer<byte[], byte[]> restoreConsumer,
                        StreamingConfig config,
                        StreamingMetrics metrics) {
-        super(id, restoreConsumer, topology, config, null);
+        super(id, partitions, topology, consumer, restoreConsumer, config, true);
 
         // initialize the topology with its own context
         this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
@@ -64,6 +67,9 @@ public class StandbyTask extends AbstractTask {
         ((StandbyContextImpl) this.processorContext).initialized();
 
         this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets());
+
+        // set initial offset limits
+        initializeOffsetLimits();
     }
 
     public Map<TopicPartition, Long> checkpointedOffsets() {
@@ -76,13 +82,24 @@ public class StandbyTask extends AbstractTask {
 
     /**
      * Updates a state store using records from one change log partition
+     * @return a list of records not consumed
      */
-    public void update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
-        stateMgr.updateStandbyStates(partition, records);
+    public List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
+        return stateMgr.updateStandbyStates(partition, records);
     }
 
     public void commit() {
         stateMgr.flush();
+
+        // reinitialize offset limits
+        initializeOffsetLimits();
+    }
+
+    protected void initializeOffsetLimits() {
+        for (TopicPartition partition : partitions) {
+            OffsetAndMetadata metadata = consumer.committed(partition); // TODO: batch API?
+            stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() : 0L);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 16f0667..24c450e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -30,9 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -45,7 +43,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     private final int maxBufferedSize;
 
-    private final Consumer consumer;
     private final PartitionGroup partitionGroup;
     private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
     private final PunctuationQueue punctuationQueue;
@@ -73,15 +70,14 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * @param metrics               the {@link StreamingMetrics} created by the thread
      */
     public StreamTask(TaskId id,
+                      Collection<TopicPartition> partitions,
+                      ProcessorTopology topology,
                       Consumer<byte[], byte[]> consumer,
                       Producer<byte[], byte[]> producer,
                       Consumer<byte[], byte[]> restoreConsumer,
-                      Collection<TopicPartition> partitions,
-                      ProcessorTopology topology,
                       StreamingConfig config,
                       StreamingMetrics metrics) {
-        super(id, restoreConsumer, topology, config, Collections.unmodifiableSet(new HashSet<>(partitions)));
-        this.consumer = consumer;
+        super(id, partitions, topology, consumer, restoreConsumer, config, false);
         this.punctuationQueue = new PunctuationQueue();
         this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 
@@ -98,7 +94,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
         TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
         this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
 
-        // initialize the consumed and produced offset cache
+        // initialize the consumed offset cache
         this.consumedOffsets = new HashMap<>();
 
         // create the record recordCollector that maintains the produced offsets
@@ -245,7 +241,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
         if (commitOffsetNeeded) {
             Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
             for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
-                consumedOffsetsAndMetadata.put(entry.getKey(), new OffsetAndMetadata(entry.getValue() + 1L));
+                TopicPartition partition = entry.getKey();
+                long offset = entry.getValue() + 1;
+                consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset));
+                stateMgr.putOffsetLimit(partition, offset);
             }
             consumer.commitSync(consumedOffsetsAndMetadata);
             commitOffsetNeeded = false;
@@ -280,6 +279,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
         punctuationQueue.schedule(new PunctuationSchedule(currNode, interval));
     }
 
+    @Override
     public void close() {
         this.partitionGroup.close();
         this.consumedOffsets.clear();

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 31dca39..c77a027 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -73,6 +74,7 @@ public class StreamThread extends Thread {
 
     protected final StreamingConfig config;
     protected final TopologyBuilder builder;
+    protected final Set<String> sourceTopics;
     protected final Producer<byte[], byte[]> producer;
     protected final Consumer<byte[], byte[]> consumer;
     protected final Consumer<byte[], byte[]> restoreConsumer;
@@ -94,6 +96,9 @@ public class StreamThread extends Thread {
     private long lastCommit;
     private long recordsProcessed;
 
+    private final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
+    private boolean processStandbyRecords = false;
+
     final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
@@ -133,6 +138,7 @@ public class StreamThread extends Thread {
 
         this.config = config;
         this.builder = builder;
+        this.sourceTopics = builder.sourceTopics();
         this.clientId = clientId;
         this.clientUUID = clientUUID;
         this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
@@ -148,6 +154,9 @@ public class StreamThread extends Thread {
         this.standbyTasks = new HashMap<>();
         this.prevTasks = new HashSet<>();
 
+        // standby ktables
+        this.standbyRecords = new HashMap<>();
+
         // read in task specific config values
         this.stateDir = new File(this.config.getString(StreamingConfig.STATE_DIR_CONFIG));
         this.stateDir.mkdir();
@@ -256,7 +265,7 @@ public class StreamThread extends Thread {
 
             ensureCopartitioning(builder.copartitionGroups());
 
-            consumer.subscribe(new ArrayList<>(builder.sourceTopics()), rebalanceListener);
+            consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
 
             while (stillRunning()) {
                 // try to fetch some records if necessary
@@ -293,15 +302,12 @@ public class StreamThread extends Thread {
                     }
 
                     maybePunctuate();
-                    maybeCommit();
                 } else {
                     // even when no task is assigned, we must poll to get a task.
                     requiresPoll = true;
                 }
-
-                if (!standbyTasks.isEmpty()) {
-                    updateStandbyTasks();
-                }
+                maybeCommit();
+                maybeUpdateStandbyTasks();
 
                 maybeClean();
             }
@@ -310,13 +316,38 @@ public class StreamThread extends Thread {
         }
     }
 
-    private void updateStandbyTasks() {
-        ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
+    private void maybeUpdateStandbyTasks() {
+        if (!standbyTasks.isEmpty()) {
+            if (processStandbyRecords) {
+                if (!standbyRecords.isEmpty()) {
+                    for (StandbyTask task : standbyTasks.values()) {
+                        for (TopicPartition partition : task.changeLogPartitions()) {
+                            List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.remove(partition);
+                            if (remaining != null) {
+                                remaining = task.update(partition, remaining);
+                                if (remaining != null) {
+                                    standbyRecords.put(partition, remaining);
+                                } else {
+                                    restoreConsumer.resume(partition);
+                                }
+                            }
+                        }
+                    }
+                }
+                processStandbyRecords = false;
+            }
 
-        if (!records.isEmpty()) {
-            for (StandbyTask task : standbyTasks.values()) {
-                for (TopicPartition partition : task.changeLogPartitions()) {
-                    task.update(partition, records.records(partition));
+            ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
+
+            if (!records.isEmpty()) {
+                for (StandbyTask task : standbyTasks.values()) {
+                    for (TopicPartition partition : task.changeLogPartitions()) {
+                        List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition));
+                        if (remaining != null) {
+                            restoreConsumer.pause(partition);
+                            standbyRecords.put(partition, remaining);
+                        }
+                    }
                 }
             }
         }
@@ -359,6 +390,8 @@ public class StreamThread extends Thread {
 
             commitAll();
             lastCommit = now;
+
+            processStandbyRecords = true;
         } else {
             for (StreamTask task : activeTasks.values()) {
                 try {
@@ -478,12 +511,12 @@ public class StreamThread extends Thread {
         return tasks;
     }
 
-    protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
+    protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
         sensors.taskCreationSensor.record();
 
         ProcessorTopology topology = builder.build(id.topicGroupId);
 
-        return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, topology, config, sensors);
+        return new StreamTask(id, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
     }
 
     private void addStreamTasks(Collection<TopicPartition> assignment) {
@@ -501,7 +534,7 @@ public class StreamThread extends Thread {
             }
         }
 
-        // create the tasks
+        // create the active tasks
         for (TaskId taskId : partitionsForTask.keySet()) {
             try {
                 activeTasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId)));
@@ -510,8 +543,6 @@ public class StreamThread extends Thread {
                 throw e;
             }
         }
-
-        lastClean = time.milliseconds();
     }
 
     private void removeStreamTasks() {
@@ -537,13 +568,13 @@ public class StreamThread extends Thread {
         sensors.taskDestructionSensor.record();
     }
 
-    protected StandbyTask createStandbyTask(TaskId id) {
+    protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
         sensors.taskCreationSensor.record();
 
         ProcessorTopology topology = builder.build(id.topicGroupId);
 
         if (!topology.stateStoreSuppliers().isEmpty()) {
-            return new StandbyTask(id, restoreConsumer, topology, config, sensors);
+            return new StandbyTask(id, partitions, topology, consumer, restoreConsumer, config, sensors);
         } else {
             return null;
         }
@@ -552,10 +583,15 @@ public class StreamThread extends Thread {
     private void addStandbyTasks() {
         Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
 
-        for (TaskId taskId : partitionGrouper.standbyTasks()) {
-            StandbyTask task = createStandbyTask(taskId);
+        // create the standby tasks
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionGrouper.standbyTasks().entrySet()) {
+            TaskId taskId = entry.getKey();
+            Set<TopicPartition> partitions = entry.getValue();
+            StandbyTask task = createStandbyTask(taskId, partitions);
             if (task != null) {
                 standbyTasks.put(taskId, task);
+                // collect checked pointed offsets to position the restore consumer
+                // this include all partitions from which we restore states
                 checkpointedOffsets.putAll(task.checkpointedOffsets());
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index d82dd7d..2bd4457 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -17,14 +17,23 @@
 
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.ByteBufferInputStream;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class AssignmentInfo {
@@ -33,70 +42,98 @@ public class AssignmentInfo {
 
     public final int version;
     public final List<TaskId> activeTasks; // each element corresponds to a partition
-    public final Set<TaskId> standbyTasks;
+    public final Map<TaskId, Set<TopicPartition>> standbyTasks;
 
-    public AssignmentInfo(List<TaskId> activeTasks, Set<TaskId> standbyTasks) {
+    public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
         this(1, activeTasks, standbyTasks);
     }
 
-    protected AssignmentInfo(int version, List<TaskId> activeTasks, Set<TaskId> standbyTasks) {
+    protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
         this.version = version;
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
     }
 
     public ByteBuffer encode() {
-        if (version == 1) {
-            ByteBuffer buf = ByteBuffer.allocate(4 + 4 + activeTasks.size() * 8 + 4 + standbyTasks.size() * 8);
-            // Encode version
-            buf.putInt(1);
-            // Encode active tasks
-            buf.putInt(activeTasks.size());
-            for (TaskId id : activeTasks) {
-                id.writeTo(buf);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream out = new DataOutputStream(baos);
+
+        try {
+            if (version == 1) {
+                // Encode version
+                out.writeInt(1);
+                // Encode active tasks
+                out.writeInt(activeTasks.size());
+                for (TaskId id : activeTasks) {
+                    id.writeTo(out);
+                }
+                // Encode standby tasks
+                out.writeInt(standbyTasks.size());
+                for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
+                    TaskId id = entry.getKey();
+                    id.writeTo(out);
+
+                    Set<TopicPartition> partitions = entry.getValue();
+                    out.writeInt(partitions.size());
+                    for (TopicPartition partition : partitions) {
+                        out.writeUTF(partition.topic());
+                        out.writeInt(partition.partition());
+                    }
+                }
+
+                out.flush();
+                out.close();
+
+                return ByteBuffer.wrap(baos.toByteArray());
+
+            } else {
+                TaskAssignmentException ex = new TaskAssignmentException("unable to encode assignment data: version=" + version);
+                log.error(ex.getMessage(), ex);
+                throw ex;
             }
-            // Encode standby tasks
-            buf.putInt(standbyTasks.size());
-            for (TaskId id : standbyTasks) {
-                id.writeTo(buf);
-            }
-            buf.rewind();
-
-            return buf;
-
-        } else {
-            TaskAssignmentException ex = new TaskAssignmentException("unable to encode assignment data: version=" + version);
-            log.error(ex.getMessage(), ex);
-            throw ex;
+        } catch (IOException ex) {
+            throw new KafkaException("failed to encode AssignmentInfo", ex);
         }
     }
 
     public static AssignmentInfo decode(ByteBuffer data) {
         // ensure we are at the beginning of the ByteBuffer
         data.rewind();
-
-        // Decode version
-        int version = data.getInt();
-        if (version == 1) {
-           // Decode active tasks
-            int count = data.getInt();
-            List<TaskId> activeTasks = new ArrayList<>(count);
-            for (int i = 0; i < count; i++) {
-                activeTasks.add(TaskId.readFrom(data));
+        DataInputStream in = new DataInputStream(new ByteBufferInputStream(data));
+
+        try {
+            // Decode version
+            int version = in.readInt();
+            if (version == 1) {
+                // Decode active tasks
+                int count = in.readInt();
+                List<TaskId> activeTasks = new ArrayList<>(count);
+                for (int i = 0; i < count; i++) {
+                    activeTasks.add(TaskId.readFrom(in));
+                }
+                // Decode standby tasks
+                count = in.readInt();
+                Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
+                for (int i = 0; i < count; i++) {
+                    TaskId id = TaskId.readFrom(in);
+
+                    int numPartitions = in.readInt();
+                    Set<TopicPartition> partitions = new HashSet<>(numPartitions);
+                    for (int j = 0; j < numPartitions; j++) {
+                        partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
+                    }
+                    standbyTasks.put(id, partitions);
+                }
+
+                return new AssignmentInfo(activeTasks, standbyTasks);
+
+            } else {
+                TaskAssignmentException ex = new TaskAssignmentException("unknown assignment data version: " + version);
+                log.error(ex.getMessage(), ex);
+                throw ex;
             }
-            // Decode standby tasks
-            count = data.getInt();
-            Set<TaskId> standbyTasks = new HashSet<>(count);
-            for (int i = 0; i < count; i++) {
-                standbyTasks.add(TaskId.readFrom(data));
-            }
-
-            return new AssignmentInfo(activeTasks, standbyTasks);
-
-        } else {
-            TaskAssignmentException ex = new TaskAssignmentException("unknown assignment data version: " + version);
-            log.error(ex.getMessage(), ex);
-            throw ex;
+        } catch (IOException ex) {
+            throw new KafkaException("failed to decode AssignmentInfo", ex);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
new file mode 100644
index 0000000..2ad1f47
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class KeyValueStoreChangeLogger<K, V> {
+
+    protected final Serdes<K, V> serialization;
+
+    private final Set<K> dirty;
+    private final Set<K> removed;
+    private final int maxDirty;
+    private final int maxRemoved;
+
+    private final String topic;
+    private int partition;
+    private ProcessorContext context;
+
+    // always wrap the logged store with the metered store
+    public KeyValueStoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
+        this.topic = topic;
+        this.serialization = serialization;
+        this.context = context;
+        this.partition = context.id().partition;
+
+        this.dirty = new HashSet<>();
+        this.removed = new HashSet<>();
+        this.maxDirty = 100; // TODO: this needs to be configurable
+        this.maxRemoved = 100; // TODO: this needs to be configurable
+    }
+
+    public void add(K key) {
+        this.dirty.add(key);
+        this.removed.remove(key);
+    }
+
+    public void delete(K key) {
+        this.dirty.remove(key);
+        this.removed.add(key);
+    }
+
+    public void maybeLogChange(KeyValueStore<K, V> kv) {
+        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
+            logChange(kv);
+    }
+
+    public void logChange(KeyValueStore<K, V> kv) {
+        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
+        if (collector != null) {
+            Serializer<K> keySerializer = serialization.keySerializer();
+            Serializer<V> valueSerializer = serialization.valueSerializer();
+
+            for (K k : this.removed) {
+                collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
+            }
+            for (K k : this.dirty) {
+                V v = kv.get(k);
+                collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
+            }
+            this.removed.clear();
+            this.dirty.clear();
+        }
+    }
+
+}