You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/06 21:14:37 UTC

[kafka] branch trunk updated: KAFKA-4750: Bypass null value and treat it as deletes (#4508)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a88db95  KAFKA-4750: Bypass null value and treat it as deletes (#4508)
a88db95 is described below

commit a88db9595910a1a7f249f7c9f3effb14636955c7
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Feb 6 13:14:33 2018 -0800

    KAFKA-4750: Bypass null value and treat it as deletes (#4508)
    
    Here is the new rule for handling nulls:
    * in the interface store, put(key, null) are handled normally and value serde will still be applied to null, hence needs to handle null values
    * in the inner bytes store, null bytes after serialization will be treated as deletes.
    * in the interface store, if null bytes get returned in get(key), it indicate the key is not available; and hence serde will be avoided and null object will be returned.
    
    More changes:
    * Update javadocs, add unit tests accordingly; augment MockContext to set serdes for the newly added tests.
    * Fixed a discovered bug which is exposed by the newly added tests.
    * Use the new API to remove all old APIs in the existing state store tests.
    * Remove SerializedKeyValueIterator since it is not used any more.
    
    This is originally contributed by @evis.
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@confluent.io>
---
 .../apache/kafka/streams/kstream/Materialized.java |  3 +-
 .../streams/state/KeyValueBytesStoreSupplier.java  |  5 ++
 .../apache/kafka/streams/state/KeyValueStore.java  |  9 ++-
 .../streams/state/SessionBytesStoreSupplier.java   |  7 +-
 .../apache/kafka/streams/state/SessionStore.java   |  9 ++-
 .../org/apache/kafka/streams/state/Stores.java     |  9 ++-
 .../streams/state/WindowBytesStoreSupplier.java    |  7 +-
 .../apache/kafka/streams/state/WindowStore.java    |  5 +-
 .../state/internals/CachingKeyValueStore.java      | 30 ++++---
 .../state/internals/CachingSessionStore.java       |  5 +-
 .../state/internals/CachingWindowStore.java        |  3 +-
 .../internals/ChangeLoggingKeyValueBytesStore.java | 11 +++
 .../state/internals/InMemoryKeyValueStore.java     | 22 ++---
 .../state/internals/InnerMeteredKeyValueStore.java |  6 +-
 .../streams/state/internals/MemoryLRUCache.java    | 33 +++++---
 .../state/internals/MemoryNavigableLRUCache.java   |  7 --
 .../state/internals/MeteredKeyValueBytesStore.java | 10 +--
 .../state/internals/MeteredSessionStore.java       | 14 ++--
 .../streams/state/internals/RocksDBStore.java      | 12 +--
 .../state/internals/SegmentedBytesStore.java       |  4 +-
 .../internals/SerializedKeyValueIterator.java      | 70 ----------------
 .../internals/WindowStoreIteratorWrapper.java      |  5 +-
 .../internals/WrappedSessionStoreIterator.java     |  3 +-
 .../kstream/internals/KTableFilterTest.java        | 18 +++--
 .../state/internals/AbstractKeyValueStoreTest.java | 75 +++++++++++++++--
 .../state/internals/CachingKeyValueStoreTest.java  | 26 +++---
 .../internals/InMemoryKeyValueLoggedStoreTest.java | 23 +-----
 .../state/internals/InMemoryKeyValueStoreTest.java | 27 +++----
 .../state/internals/InMemoryLRUCacheStoreTest.java | 29 +++----
 .../state/internals/RocksDBKeyValueStoreTest.java  | 31 +++----
 .../internals/SerializedKeyValueIteratorTest.java  | 94 ----------------------
 .../apache/kafka/test/MockProcessorContext.java    | 12 ++-
 32 files changed, 269 insertions(+), 355 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index 48dd12e..89603fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -160,7 +160,8 @@ public class Materialized<K, V, S extends StateStore> {
      * Set the valueSerde the materialized {@link StateStore} will use.
      *
      * @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value
-     *                   serde from configs will be used
+     *                   serde from configs will be used. If the serialized bytes is null for put operations,
+     *                   it is treated as delete operation
      * @return itself
      */
     public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
index 73e6732..b12e94e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
@@ -20,6 +20,11 @@ import org.apache.kafka.common.utils.Bytes;
 
 /**
  * A store supplier that can be used to create one or more {@link KeyValueStore KeyValueStore<Bytes, byte[]>} instances of type &lt;Byte, byte[]&gt;.
+ *
+ * For any stores implementing the {@link KeyValueStore KeyValueStore<Bytes, byte[]>} interface, null value bytes are considered as "not exist". This means:
+ *
+ * 1. Null value bytes in put operations should be treated as delete.
+ * 2. If the key does not exist, get operations should return null value bytes.
  */
 public interface KeyValueBytesStoreSupplier extends StoreSupplier<KeyValueStore<Bytes, byte[]>> {
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index 129ae6b..3685229 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -33,7 +33,8 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
      * Update the value associated with this key
      *
      * @param key The key to associate the value to
-     * @param value The value, it can be null.
+     * @param value The value to update, it can be null;
+     *              if the serialized bytes are also null it is interpreted as deletes
      * @throws NullPointerException If null is used for key.
      */
     void put(K key, V value);
@@ -43,7 +44,8 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
      * is already associated with the key
      *
      * @param key The key to associate the value to
-     * @param value The value, it can be null
+     * @param value The value to update, it can be null;
+     *              if the serialized bytes are also null it is interpreted as deletes
      * @return The old value or null if there is no such key.
      * @throws NullPointerException If null is used for key.
      */
@@ -52,7 +54,8 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
     /**
      * Update all the given key/value pairs
      *
-     * @param entries A list of entries to put into the store.
+     * @param entries A list of entries to put into the store;
+     *              if the serialized bytes are also null it is interpreted as deletes
      * @throws NullPointerException If null is used for key.
      */
     void putAll(List<KeyValue<K, V>> entries);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
index e5523da..04b0ceb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
@@ -19,7 +19,12 @@ package org.apache.kafka.streams.state;
 import org.apache.kafka.common.utils.Bytes;
 
 /**
- * A store supplier that can be used to create one or more {@link SessionStore SessionStore<Bytes, byte[]>>} instances of type &lt;Byte, byte[]&gt;.
+ * A store supplier that can be used to create one or more {@link SessionStore SessionStore<Bytes, byte[]>} instances of type &lt;Byte, byte[]&gt;.
+ *
+ * For any stores implementing the {@link SessionStore SessionStore<Bytes, byte[]>} interface, null value bytes are considered as "not exist". This means:
+ *
+ * 1. Null value bytes in put operations should be treated as delete.
+ * 2. Null value bytes should never be returned in range query results.
  */
 public interface SessionBytesStoreSupplier extends StoreSupplier<SessionStore<Bytes, byte[]>> {
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
index fcbce5f..c1a6993 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
@@ -33,7 +33,8 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
      * This iterator must be closed after use.
      *
      * @param key the key to return sessions for
-     * @param earliestSessionEndTime
+     * @param earliestSessionEndTime the end timestamp of the earliest session to search for
+     * @param latestSessionStartTime the end timestamp of the latest session to search for
      * @return iterator of sessions with the matching key and aggregated values
      * @throws NullPointerException If null is used for key.
      */
@@ -47,7 +48,8 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
      *
      * @param keyFrom The first key that could be in the range
      * @param keyTo The last key that could be in the range
-     * @param earliestSessionEndTime
+     * @param earliestSessionEndTime the end timestamp of the earliest session to search for
+     * @param latestSessionStartTime the end timestamp of the latest session to search for
      * @return iterator of sessions with the matching keys and aggregated values
      * @throws NullPointerException If null is used for any key.
      */
@@ -63,7 +65,8 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
     /**
      * Write the aggregated value for the provided key to the store
      * @param sessionKey key of the session to write
-     * @param aggregate  the aggregated value for the session
+     * @param aggregate  the aggregated value for the session, it can be null;
+     *                   if the serialized bytes are also null it is interpreted as deletes
      * @throws NullPointerException If null is used for sessionKey.
      */
     void put(final Windowed<K> sessionKey, final AGG aggregate);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 0ce6d9e..0aaa2b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -198,7 +198,8 @@ public class Stores {
      * Creates a {@link StoreBuilder} that can be used to build a {@link WindowStore}.
      * @param supplier      a {@link WindowBytesStoreSupplier} (cannot be {@code null})
      * @param keySerde      the key serde to use
-     * @param valueSerde    the value serde to use
+     * @param valueSerde    the value serde to use; if the serialized bytes is null for put operations,
+     *                      it is treated as delete
      * @param <K>           key type
      * @param <V>           value type
      * @return an instance of {@link StoreBuilder} than can build a {@link WindowStore}
@@ -214,7 +215,8 @@ public class Stores {
      * Creates a {@link StoreBuilder} than can be used to build a {@link KeyValueStore}.
      * @param supplier      a {@link KeyValueBytesStoreSupplier} (cannot be {@code null})
      * @param keySerde      the key serde to use
-     * @param valueSerde    the value serde to use
+     * @param valueSerde    the value serde to use; if the serialized bytes is null for put operations,
+     *                      it is treated as delete
      * @param <K>           key type
      * @param <V>           value type
      * @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore}
@@ -230,7 +232,8 @@ public class Stores {
      * Creates a {@link StoreBuilder} that can be used to build a {@link SessionStore}.
      * @param supplier      a {@link SessionBytesStoreSupplier} (cannot be {@code null})
      * @param keySerde      the key serde to use
-     * @param valueSerde    the value serde to use
+     * @param valueSerde    the value serde to use; if the serialized bytes is null for put operations,
+     *                      it is treated as delete
      * @param <K>           key type
      * @param <V>           value type
      * @return an instance of {@link StoreBuilder} than can build a {@link SessionStore}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
index 5fbe6b0..9cf70c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
@@ -19,7 +19,12 @@ package org.apache.kafka.streams.state;
 import org.apache.kafka.common.utils.Bytes;
 
 /**
- * A store supplier that can be used to create one or more {@link WindowStore WindowStore<Bytes, byte[]>>} instances of type &lt;Byte, byte[]&gt;.
+ * A store supplier that can be used to create one or more {@link WindowStore WindowStore<Bytes, byte[]>} instances of type &lt;Byte, byte[]&gt;.
+ *
+ * For any stores implementing the {@link WindowStore WindowStore<Bytes, byte[]>} interface, null value bytes are considered as "not exist". This means:
+ *
+ * 1. Null value bytes in put operations should be treated as delete.
+ * 2. Null value bytes should never be returned in range query results.
  */
 public interface WindowBytesStoreSupplier extends StoreSupplier<WindowStore<Bytes, byte[]>> {
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 6a4d5f6..d2f6ad8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -30,7 +30,8 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      * Put a key-value pair with the current wall-clock time as the timestamp
      * into the corresponding window
      * @param key The key to associate the value to
-     * @param value The value, it can be null
+     * @param value The value to update, it can be null;
+     *              if the serialized bytes are also null it is interpreted as deletes
      * @throws NullPointerException If null is used for key.
      */
     void put(K key, V value);
@@ -38,7 +39,7 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
     /**
      * Put a key-value pair with the given timestamp into the corresponding window
      * @param key The key to associate the value to
-     * @param value The value, it can be null
+     * @param value The value; can be null
      * @throws NullPointerException If null is used for key.
      */
     void put(K key, V value, long timestamp);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index f9ab3f1..82a6ac7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -91,7 +91,12 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
         try {
             context.setRecordContext(entry.recordContext());
             if (flushListener != null) {
-                final V oldValue = sendOldValues ? serdes.valueFrom(underlying.get(entry.key())) : null;
+                V oldValue = null;
+                if (sendOldValues) {
+                    final byte[] oldBytesValue = underlying.get(entry.key());
+                    oldValue = oldBytesValue == null ? null : serdes.valueFrom(oldBytesValue);
+                }
+                // we rely on underlying store to handle null new value bytes as deletes
                 underlying.put(entry.key(), entry.newValue());
                 flushListener.apply(serdes.keyFrom(entry.key().get()),
                                     serdes.valueFrom(entry.newValue()),
@@ -141,6 +146,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
 
     @Override
     public byte[] get(final Bytes key) {
+        Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
         Lock theLock;
         if (Thread.currentThread().equals(streamThread)) {
@@ -150,7 +156,6 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
         }
         theLock.lock();
         try {
-            Objects.requireNonNull(key);
             return getInternal(key);
         } finally {
             theLock.unlock();
@@ -212,15 +217,15 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
         validateStoreOpen();
         lock.writeLock().lock();
         try {
+            // for null bytes, we still put it into cache indicating tombstones
             putInternal(key, value);
         } finally {
             lock.writeLock().unlock();
         }
     }
 
-    private void putInternal(final Bytes rawKey, final byte[] value) {
-        Objects.requireNonNull(rawKey, "key cannot be null");
-        cache.put(cacheName, rawKey, new LRUCacheEntry(value, true, context.offset(),
+    private void putInternal(final Bytes key, final byte[] value) {
+        cache.put(cacheName, key, new LRUCacheEntry(value, true, context.offset(),
               context.timestamp(), context.partition(), context.topic()));
     }
 
@@ -242,9 +247,11 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
 
     @Override
     public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        validateStoreOpen();
         lock.writeLock().lock();
         try {
             for (KeyValue<Bytes, byte[]> entry : entries) {
+                Objects.requireNonNull(entry.key, "key cannot be null");
                 put(entry.key, entry.value);
             }
         } finally {
@@ -254,19 +261,22 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
 
     @Override
     public byte[] delete(final Bytes key) {
+        Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
         lock.writeLock().lock();
         try {
-            Objects.requireNonNull(key);
-            final byte[] v = getInternal(key);
-            cache.delete(cacheName, key);
-            underlying.delete(key);
-            return v;
+            return deleteInternal(key);
         } finally {
             lock.writeLock().unlock();
         }
     }
 
+    private byte[] deleteInternal(final Bytes key) {
+        final byte[] v = getInternal(key);
+        putInternal(key, null);
+        return v;
+    }
+
     KeyValueStore<Bytes, byte[]> underlying() {
         return underlying;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 05851e5..31b9d75 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -160,8 +160,6 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
         return findSessions(from, to, 0, Long.MAX_VALUE);
     }
 
-
-
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
         final Bytes binaryKey = cacheFunction.key(entry.key());
         final RecordContext current = context.recordContext();
@@ -183,8 +181,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
     }
 
     private AGG fetchPrevious(final Bytes rawKey, final Window window) {
-        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore
-                .findSessions(rawKey, window.start(), window.end())) {
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore.findSessions(rawKey, window.start(), window.end())) {
             if (!iterator.hasNext()) {
                 return null;
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 75acd77..ad0bd99 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -112,8 +112,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
             context.setRecordContext(entry.recordContext());
             try {
                 final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null;
-                flushListener.apply(windowedKey,
-                                    serdes.valueFrom(entry.newValue()), oldValue);
+                flushListener.apply(windowedKey, serdes.valueFrom(entry.newValue()), oldValue);
             } finally {
                 context.setRecordContext(current);
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 94ee275..0fdd3e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -45,6 +45,17 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS
                 ProcessorStateManager.storeChangelogTopic(
                     context.applicationId(),
                     inner.name())));
+
+        // if the inner store is an LRU cache, add the eviction listener to log removed record
+        if (inner instanceof MemoryLRUCache) {
+            ((MemoryLRUCache<Bytes, byte[]>) inner).whenEldestRemoved(new MemoryLRUCache.EldestEntryRemovalListener<Bytes, byte[]>() {
+                @Override
+                public void apply(Bytes key, byte[] value) {
+                    // pass null to indicate removal
+                    changeLogger.logChange(key, null);
+                }
+            });
+        }
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 929d584..2cdfc4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -63,7 +63,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     @SuppressWarnings("unchecked")
-    public void init(ProcessorContext context, StateStore root) {
+    public void init(final ProcessorContext context, final StateStore root) {
         // construct the serde
         this.serdes = new StateSerdes<>(
             ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
@@ -99,17 +99,21 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public synchronized V get(K key) {
+    public synchronized V get(final K key) {
         return this.map.get(key);
     }
 
     @Override
-    public synchronized void put(K key, V value) {
-        this.map.put(key, value);
+    public synchronized void put(final K key, final V value) {
+        if (value == null) {
+            this.map.remove(key);
+        } else {
+            this.map.put(key, value);
+        }
     }
 
     @Override
-    public synchronized V putIfAbsent(K key, V value) {
+    public synchronized V putIfAbsent(final K key, final V value) {
         V originalValue = get(key);
         if (originalValue == null) {
             put(key, value);
@@ -118,18 +122,18 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public synchronized void putAll(List<KeyValue<K, V>> entries) {
+    public synchronized void putAll(final List<KeyValue<K, V>> entries) {
         for (KeyValue<K, V> entry : entries)
             put(entry.key, entry.value);
     }
 
     @Override
-    public synchronized V delete(K key) {
+    public synchronized V delete(final K key) {
         return this.map.remove(key);
     }
 
     @Override
-    public synchronized KeyValueIterator<K, V> range(K from, K to) {
+    public synchronized KeyValueIterator<K, V> range(final K from, final K to) {
         return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator()));
     }
 
@@ -158,7 +162,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private static class InMemoryKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
         private final Iterator<Map.Entry<K, V>> iter;
 
-        private InMemoryKeyValueIterator(Iterator<Map.Entry<K, V>> iter) {
+        private InMemoryKeyValueIterator(final Iterator<Map.Entry<K, V>> iter) {
             this.iter = iter;
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
index d076a05..40e2d43 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
@@ -75,9 +75,9 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
 
     // always wrap the store with the metered store
     InnerMeteredKeyValueStore(final KeyValueStore<IK, IV> inner,
-                                     final String metricScope,
-                                     final TypeConverter<K, IK, V, IV> typeConverter,
-                                     final Time time) {
+                              final String metricScope,
+                              final TypeConverter<K, IK, V, IV> typeConverter,
+                              final Time time) {
         super(inner);
         this.inner = inner;
         this.metricScope = metricScope;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index f1aa96f..2785540 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -61,9 +61,12 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
                                             // in the future we should augment the StateRestoreCallback with onComplete etc to better resolve this.
     private volatile boolean open = true;
 
-    EldestEntryRemovalListener<K, V> listener;
+    private EldestEntryRemovalListener<K, V> listener;
 
-    MemoryLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, Serde<V> valueSerde) {
+    MemoryLRUCache(final String name,
+                   final int maxCacheSize,
+                   final Serde<K> keySerde,
+                   final Serde<V> valueSerde) {
         this.name = name;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
@@ -87,7 +90,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
         return new InMemoryKeyValueLoggedStore<>(this, keySerde, valueSerde);
     }
 
-    MemoryLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+    MemoryLRUCache<K, V> whenEldestRemoved(final EldestEntryRemovalListener<K, V> listener) {
         this.listener = listener;
 
         return this;
@@ -114,7 +117,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
                 restoring = true;
                 // check value for null, to avoid  deserialization error.
                 if (value == null) {
-                    put(serdes.keyFrom(key), null);
+                    delete(serdes.keyFrom(key));
                 } else {
                     put(serdes.keyFrom(key), serdes.valueFrom(value));
                 }
@@ -134,19 +137,24 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public synchronized V get(K key) {
+    public synchronized V get(final K key) {
         Objects.requireNonNull(key);
+
         return this.map.get(key);
     }
 
     @Override
-    public synchronized void put(K key, V value) {
+    public synchronized void put(final K key, final V value) {
         Objects.requireNonNull(key);
-        this.map.put(key, value);
+        if (value == null) {
+            this.map.remove(key);
+        } else {
+            this.map.put(key, value);
+        }
     }
 
     @Override
-    public synchronized V putIfAbsent(K key, V value) {
+    public synchronized V putIfAbsent(final K key, final V value) {
         Objects.requireNonNull(key);
         V originalValue = get(key);
         if (originalValue == null) {
@@ -156,23 +164,22 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public void putAll(List<KeyValue<K, V>> entries) {
+    public void putAll(final List<KeyValue<K, V>> entries) {
         for (KeyValue<K, V> entry : entries)
             put(entry.key, entry.value);
     }
 
     @Override
-    public synchronized V delete(K key) {
+    public synchronized V delete(final K key) {
         Objects.requireNonNull(key);
-        V value = this.map.remove(key);
-        return value;
+        return this.map.remove(key);
     }
 
     /**
      * @throws UnsupportedOperationException
      */
     @Override
-    public KeyValueIterator<K, V> range(K from, K to) {
+    public KeyValueIterator<K, V> range(final K from, final K to) {
         throw new UnsupportedOperationException("MemoryLRUCache does not support range() function.");
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index 268042d..9500e8e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -32,13 +32,6 @@ public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
     }
 
     @Override
-    public MemoryNavigableLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
-        this.listener = listener;
-
-        return this;
-    }
-
-    @Override
     public KeyValueIterator<K, V> range(K from, K to) {
         final TreeMap<K, V> treeMap = toTreeMap();
         return new DelegatingPeekingKeyValueIterator<>(name(), new MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().subSet(from, true, to, true).iterator(), treeMap));
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
index 35647b7..3a30c10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
@@ -62,9 +62,7 @@ public class MeteredKeyValueBytesStore<K, V> extends WrappedStateStore.AbstractS
 
             @Override
             public byte[] innerValue(final V value) {
-                if (value == null) {
-                    return null;
-                }
+                // do not check on null, but rely on user serde to handle it
                 return serdes.rawValue(value);
             }
 
@@ -80,12 +78,12 @@ public class MeteredKeyValueBytesStore<K, V> extends WrappedStateStore.AbstractS
 
             @Override
             public V outerValue(final byte[] value) {
-                return serdes.valueFrom(value);
+                return value == null ? null : serdes.valueFrom(value);
             }
 
             @Override
-            public KeyValue<K, V> outerKeyValue(final KeyValue<Bytes, byte[]> from) {
-                return KeyValue.pair(serdes.keyFrom(from.key.get()), serdes.valueFrom(from.value));
+            public KeyValue<K, V> outerKeyValue(final KeyValue<Bytes, byte[]> keyValue) {
+                return KeyValue.pair(serdes.keyFrom(keyValue.key.get()), keyValue.value == null ? null : serdes.valueFrom(keyValue.value));
             }
 
             @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 8d9065c..1a9ac20 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -92,7 +92,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
                                                          final long earliestSessionEndTime,
                                                          final long latestSessionStartTime) {
         Objects.requireNonNull(key, "key cannot be null");
-        final Bytes bytesKey = Bytes.wrap(serdes.rawKey(key));
+        final Bytes bytesKey = keyBytes(key);
         return new MeteredWindowedKeyValueIterator<>(inner.findSessions(bytesKey,
                                                                         earliestSessionEndTime,
                                                                         latestSessionStartTime),
@@ -109,8 +109,8 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
                                                          final long latestSessionStartTime) {
         Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
         Objects.requireNonNull(keyTo, "keyTo cannot be null");
-        final Bytes bytesKeyFrom = Bytes.wrap(serdes.rawKey(keyFrom));
-        final Bytes bytesKeyTo = Bytes.wrap(serdes.rawKey(keyTo));
+        final Bytes bytesKeyFrom = keyBytes(keyFrom);
+        final Bytes bytesKeyTo = keyBytes(keyTo);
         return new MeteredWindowedKeyValueIterator<>(inner.findSessions(bytesKeyFrom,
                                                                         bytesKeyTo,
                                                                         earliestSessionEndTime,
@@ -126,7 +126,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
         Objects.requireNonNull(sessionKey, "sessionKey can't be null");
         final long startNs = time.nanoseconds();
         try {
-            final Bytes key = Bytes.wrap(serdes.rawKey(sessionKey.key()));
+            final Bytes key = keyBytes(sessionKey.key());
             inner.remove(new Windowed<>(key, sessionKey.window()));
         } finally {
             this.metrics.recordLatency(removeTime, startNs, time.nanoseconds());
@@ -138,13 +138,17 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
         Objects.requireNonNull(sessionKey, "sessionKey can't be null");
         long startNs = time.nanoseconds();
         try {
-            final Bytes key = Bytes.wrap(serdes.rawKey(sessionKey.key()));
+            final Bytes key = keyBytes(sessionKey.key());
             this.inner.put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate));
         } finally {
             this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
         }
     }
 
+    private Bytes keyBytes(final K key) {
+        return Bytes.wrap(serdes.rawKey(key));
+    }
+
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 4c7e61b..67ec915 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -229,11 +229,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     public synchronized V get(K key) {
         validateStoreOpen();
         byte[] byteValue = getInternal(serdes.rawKey(key));
-        if (byteValue == null) {
-            return null;
-        } else {
-            return serdes.valueFrom(byteValue);
-        }
+        return byteValue  == null ? null : serdes.valueFrom(byteValue);
     }
 
     private void validateStoreOpen() {
@@ -341,11 +337,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             for (KeyValue<K, V> entry : entries) {
                 Objects.requireNonNull(entry.key, "key cannot be null");
                 final byte[] rawKey = serdes.rawKey(entry.key);
-                if (entry.value == null) {
+                final byte[] rawValue = serdes.rawValue(entry.value);
+                if (rawValue == null) {
                     batch.remove(rawKey);
                 } else {
-                    final byte[] value = serdes.rawValue(entry.value);
-                    batch.put(rawKey, value);
+                    batch.put(rawKey, rawValue);
                 }
             }
             db.write(wOptions, batch);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index 19cf319..a533185 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -62,8 +62,8 @@ public interface SegmentedBytesStore extends StateStore {
     /**
      * Gets all the key-value pairs that belong to the windows within in the given time range.
      *
-     * @param timeFrom the beginning of the time slot from which to search
-     * @param timeTo   the end of the time slot from which to search
+     * @param from the beginning of the time slot from which to search
+     * @param to   the end of the time slot from which to search
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
      * @throws InvalidStateStoreException if the store is not initialized
      * @throws NullPointerException if null is used for any key
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
deleted file mode 100644
index 2c050f5..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
-
-import java.util.NoSuchElementException;
-
-class SerializedKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
-
-    private final KeyValueIterator<Bytes, byte[]> bytesIterator;
-    private final StateSerdes<K, V> serdes;
-
-    SerializedKeyValueIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator,
-                               final StateSerdes<K, V> serdes) {
-
-        this.bytesIterator = bytesIterator;
-        this.serdes = serdes;
-    }
-
-    @Override
-    public void close() {
-        bytesIterator.close();
-    }
-
-    @Override
-    public K peekNextKey() {
-        if (!hasNext()) {
-            throw new NoSuchElementException();
-        }
-        final Bytes bytes = bytesIterator.peekNextKey();
-        return serdes.keyFrom(bytes.get());
-    }
-
-    @Override
-    public boolean hasNext() {
-        return bytesIterator.hasNext();
-    }
-
-    @Override
-    public KeyValue<K, V> next() {
-        if (!hasNext()) {
-            throw new NoSuchElementException();
-        }
-        final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-        return KeyValue.pair(serdes.keyFrom(next.key.get()), serdes.valueFrom(next.value));
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
index 4fd6f3e..4cb85d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
@@ -149,8 +149,9 @@ class WindowStoreIteratorWrapper<K, V> {
         final StateSerdes<K, V> serdes;
         final long windowSize;
 
-        WrappedKeyValueIterator(
-            KeyValueIterator<Bytes, byte[]> bytesIterator, StateSerdes<K, V> serdes, long windowSize) {
+        WrappedKeyValueIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator,
+                                final StateSerdes<K, V> serdes,
+                                final long windowSize) {
             this.bytesIterator = bytesIterator;
             this.serdes = serdes;
             this.windowSize = windowSize;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
index 6fd9636..c5ea70b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
@@ -77,7 +77,8 @@ class WrappedSessionStoreIterator<K, V> implements KeyValueIterator<Windowed<K>,
     @Override
     public KeyValue<Windowed<K>, V> next() {
         final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-        return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer(), serdes.topic()), serdes.valueFrom(next.value));
+        return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer(), serdes.topic()),
+                             serdes.valueFrom(next.value));
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 01236ad..657e05d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -53,8 +53,10 @@ public class KTableFilterTest {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
-    private void doTestKTable(final StreamsBuilder builder, final KTable<String, Integer> table2,
-                              final KTable<String, Integer> table3, final String topic1) {
+    private void doTestKTable(final StreamsBuilder builder,
+                              final KTable<String, Integer> table2,
+                              final KTable<String, Integer> table3,
+                              final String topic) {
         MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
         MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
         table2.toStream().process(proc2);
@@ -62,13 +64,13 @@ public class KTableFilterTest {
 
         driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
 
-        driver.process(topic1, "A", 1);
-        driver.process(topic1, "B", 2);
-        driver.process(topic1, "C", 3);
-        driver.process(topic1, "D", 4);
+        driver.process(topic, "A", 1);
+        driver.process(topic, "B", 2);
+        driver.process(topic, "C", 3);
+        driver.process(topic, "D", 4);
         driver.flushState();
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
+        driver.process(topic, "A", null);
+        driver.process(topic, "B", null);
         driver.flushState();
 
         proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 398c4c5..0583e91 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -32,12 +36,11 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 public abstract class AbstractKeyValueStoreTest {
 
-    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
-                                                                      Class<K> keyClass, Class<V> valueClass,
-                                                                      boolean useContextSerdes);
+    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context);
 
     protected MockProcessorContext context;
     protected KeyValueStore<Integer, String> store;
@@ -48,7 +51,7 @@ public abstract class AbstractKeyValueStoreTest {
         driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
         context = (MockProcessorContext) driver.context();
         context.setTime(10);
-        store = createKeyValueStore(context, Integer.class, String.class, false);
+        store = createKeyValueStore(context);
     }
 
     @After
@@ -68,6 +71,66 @@ public abstract class AbstractKeyValueStoreTest {
     }
 
     @Test
+    public void shouldNotIncludeDeletedFromRangeResult() {
+        store.close();
+
+        final Serializer<String> serializer = new StringSerializer() {
+            private int numCalls = 0;
+
+            @Override
+            public byte[] serialize(final String topic, final String data) {
+                if (++numCalls > 3) {
+                    fail("Value serializer is called; it should never happen");
+                }
+
+                return super.serialize(topic, data);
+            }
+        };
+
+        context.setValueSerde(Serdes.serdeFrom(serializer, new StringDeserializer()));
+        store = createKeyValueStore(driver.context());
+
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.delete(0);
+        store.delete(1);
+
+        // should not include deleted records in iterator
+        final Map<Integer, String> expectedContents = Collections.singletonMap(2, "two");
+        assertEquals(expectedContents, getContents(store.all()));
+    }
+
+    @Test
+    public void shouldDeleteIfSerializedValueIsNull() {
+        store.close();
+
+        final Serializer<String> serializer = new StringSerializer() {
+            @Override
+            public byte[] serialize(final String topic, final String data) {
+                if (data.equals("null")) {
+                    // will be serialized to null bytes, indicating deletes
+                    return null;
+                }
+                return super.serialize(topic, data);
+            }
+        };
+
+        context.setValueSerde(Serdes.serdeFrom(serializer, new StringDeserializer()));
+        store = createKeyValueStore(driver.context());
+
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.put(0, "null");
+        store.put(1, "null");
+
+        // should not include deleted records in iterator
+        final Map<Integer, String> expectedContents = Collections.singletonMap(2, "two");
+        assertEquals(expectedContents, getContents(store.all()));
+    }
+
+    @Test
     public void testPutGetRange() {
         // Verify that the store reads and writes correctly ...
         store.put(0, "zero");
@@ -157,7 +220,7 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        store = createKeyValueStore(driver.context());
         context.restore(store.name(), driver.restoredEntries());
 
         // Verify that the store's contents were properly restored ...
@@ -179,7 +242,7 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        store = createKeyValueStore(driver.context());
         context.restore(store.name(), driver.restoredEntries());
         // Verify that the store's contents were properly restored ...
         assertEquals(0, driver.checkForRestoredEntries(store));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 97a2fbf..3ff343a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
@@ -30,6 +31,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockProcessorContext;
 import org.junit.After;
@@ -84,22 +86,14 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context,
-                                                             final Class<K> keyClass,
-                                                             final Class<V> valueClass,
-                                                             final boolean useContextSerdes) {
-        final String storeName = "cache-store";
-
-
-        final Stores.PersistentKeyValueFactory<K, V> factory = Stores
-                .create(storeName)
-                .withKeys(Serdes.serdeFrom(keyClass))
-                .withValues(Serdes.serdeFrom(valueClass))
-                .persistent()
-                .enableCaching();
-
-
-        final KeyValueStore<K, V> store = (KeyValueStore<K, V>) factory.build().get();
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+                Stores.persistentKeyValueStore("cache-store"),
+                (Serde<K>) context.keySerde(),
+                (Serde<V>) context.valueSerde())
+                .withCachingEnabled();
+
+        final KeyValueStore<K, V> store = (KeyValueStore<K, V>) storeBuilder.build();
         final CacheFlushListenerStub<K, V> cacheFlushListener = new CacheFlushListenerStub<>();
 
         final CachedStateStore inner = (CachedStateStore) ((WrappedStateStore) store).wrappedStore();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index d24a90f..adaab00 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -36,27 +35,11 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
-            ProcessorContext context,
-            Class<K> keyClass,
-            Class<V> valueClass,
-            boolean useContextSerdes) {
-
-        final Serde<K> keySerde;
-        final Serde<V> valueSerde;
-
-        if (useContextSerdes) {
-            keySerde = (Serde<K>) context.keySerde();
-            valueSerde = (Serde<V>) context.valueSerde();
-        } else {
-            keySerde = Serdes.serdeFrom(keyClass);
-            valueSerde = Serdes.serdeFrom(valueClass);
-        }
-
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
         final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
                 Stores.inMemoryKeyValueStore("my-store"),
-                keySerde,
-                valueSerde)
+                (Serde<K>) context.keySerde(),
+                (Serde<V>) context.valueSerde())
                 .withLoggingEnabled(Collections.singletonMap("retention.ms", "1000"));
 
         final StateStore store = storeBuilder.build();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 541c003..ef5d6dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
 
@@ -30,22 +32,15 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
-            ProcessorContext context,
-            Class<K> keyClass,
-            Class<V> valueClass,
-            boolean useContextSerdes) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("my-store"),
+                (Serde<K>) context.keySerde(),
+                (Serde<V>) context.valueSerde());
 
-        StateStoreSupplier supplier;
-        if (useContextSerdes) {
-            supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).inMemory().build();
-        } else {
-            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build();
-        }
-
-        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        final StateStore store = storeBuilder.build();
         store.init(context, store);
-        return store;
+        return (KeyValueStore<K, V>) store;
     }
 
     @Test
@@ -59,7 +54,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
         driver.addEntryToRestoreLog(3, "three");
         driver.addEntryToRestoreLog(0, null);
 
-        store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        store = createKeyValueStore(driver.context());
         context.restore(store.name(), driver.restoredEntries());
 
         assertEquals(3, driver.sizeOf(store));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 7dda585..2d39ae7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -16,10 +16,12 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
 
@@ -36,22 +38,17 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
-            ProcessorContext context,
-            Class<K> keyClass,
-            Class<V> valueClass,
-            boolean useContextSerdes) {
-
-        StateStoreSupplier supplier;
-        if (useContextSerdes) {
-            supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).inMemory().maxEntries(10).build();
-        } else {
-            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().maxEntries(10).build();
-        }
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+
+        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+                Stores.lruMap("my-store", 10),
+                (Serde<K>) context.keySerde(),
+                (Serde<V>) context.valueSerde());
 
-        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        final StateStore store = storeBuilder.build();
         store.init(context, store);
-        return store;
+
+        return (KeyValueStore<K, V>) store;
     }
 
     @Test
@@ -157,7 +154,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        store = createKeyValueStore(driver.context());
         context.restore(store.name(), driver.restoredEntries());
         // Verify that the store's changelog does not get more appends ...
         assertEquals(0, driver.numFlushedEntryStored());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 5aaf82f..ec2e338 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -16,11 +16,14 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
 import org.rocksdb.Options;
@@ -36,29 +39,15 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context,
-                                                             final Class<K> keyClass,
-                                                             final Class<V> valueClass,
-                                                             final boolean useContextSerdes) {
-        final Stores.PersistentKeyValueFactory<?, ?> factory;
-        if (useContextSerdes) {
-            factory = Stores
-                    .create("my-store")
-                    .withKeys(context.keySerde())
-                    .withValues(context.valueSerde())
-                    .persistent();
-
-        } else {
-            factory = Stores
-                    .create("my-store")
-                    .withKeys(keyClass)
-                    .withValues(valueClass)
-                    .persistent();
-        }
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+                Stores.persistentKeyValueStore("my-store"),
+                (Serde<K>) context.keySerde(),
+                (Serde<V>) context.valueSerde());
 
-        final KeyValueStore<K, V> store = (KeyValueStore<K, V>) factory.build().get();
+        final StateStore store = storeBuilder.build();
         store.init(context, store);
-        return store;
+        return (KeyValueStore<K, V>) store;
     }
 
     public static class TheRocksDbConfigSetter implements RocksDBConfigSetter {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java
deleted file mode 100644
index d11b9b4..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIteratorTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.KeyValueIteratorStub;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class SerializedKeyValueIteratorTest {
-
-    private final StateSerdes<String, String> serdes = new StateSerdes<>("blah", Serdes.String(), Serdes.String());
-    private final Iterator<KeyValue<Bytes, byte[]>> iterator
-            = Arrays.asList(KeyValue.pair(Bytes.wrap("hi".getBytes()), "there".getBytes()),
-                            KeyValue.pair(Bytes.wrap("hello".getBytes()), "world".getBytes()))
-            .iterator();
-    private final DelegatingPeekingKeyValueIterator<Bytes, byte[]> peeking
-            = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(iterator));
-    private final SerializedKeyValueIterator<String, String> serializedKeyValueIterator
-            = new SerializedKeyValueIterator<>(peeking, serdes);
-
-    @Test
-    public void shouldReturnTrueOnHasNextWhenMoreResults() {
-        assertTrue(serializedKeyValueIterator.hasNext());
-    }
-
-    @Test
-    public void shouldReturnNextValueWhenItExists() {
-        assertThat(serializedKeyValueIterator.next(), equalTo(KeyValue.pair("hi", "there")));
-        assertThat(serializedKeyValueIterator.next(), equalTo(KeyValue.pair("hello", "world")));
-    }
-
-    @Test
-    public void shouldReturnFalseOnHasNextWhenNoMoreResults() {
-        advanceIteratorToEnd();
-        assertFalse(serializedKeyValueIterator.hasNext());
-    }
-
-    @Test
-    public void shouldThrowNoSuchElementOnNextWhenIteratorExhausted() {
-        advanceIteratorToEnd();
-        try {
-            serializedKeyValueIterator.next();
-            fail("Expected NoSuchElementException on exhausted iterator");
-        } catch (final NoSuchElementException nse) {
-            // pass
-        }
-    }
-
-    @Test
-    public void shouldPeekNextKey() {
-        assertThat(serializedKeyValueIterator.peekNextKey(), equalTo("hi"));
-        serializedKeyValueIterator.next();
-        assertThat(serializedKeyValueIterator.peekNextKey(), equalTo("hello"));
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void shouldThrowUnsupportedOperationOnRemove() {
-        serializedKeyValueIterator.remove();
-    }
-
-    private void advanceIteratorToEnd() {
-        serializedKeyValueIterator.next();
-        serializedKeyValueIterator.next();
-    }
-
-
-}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index ce6cca8..06137fb 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -50,12 +50,12 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
 
     private final File stateDir;
     private final Metrics metrics;
-    private final Serde<?> keySerde;
-    private final Serde<?> valSerde;
     private final RecordCollector.Supplier recordCollectorSupplier;
     private final Map<String, StateStore> storeMap = new LinkedHashMap<>();
     private final Map<String, StateRestoreCallback> restoreFuncs = new HashMap<>();
 
+    private Serde<?> keySerde;
+    private Serde<?> valSerde;
     private long timestamp = -1L;
 
     public MockProcessorContext(final File stateDir,
@@ -121,6 +121,14 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
         return recordCollector;
     }
 
+    public void setKeySerde(final Serde<?> keySerde) {
+        this.keySerde = keySerde;
+    }
+
+    public void setValueSerde(final Serde<?> valSerde) {
+        this.valSerde = valSerde;
+    }
+
     // serdes will override whatever specified in the configs
     @Override
     public Serde<?> keySerde() {

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.