You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/08 04:16:50 UTC

kafka git commit: KAFKA-5314; exception handling and cleanup for state stores

Repository: kafka
Updated Branches:
  refs/heads/trunk 3de2d296e -> f4e0deca4


KAFKA-5314; exception handling and cleanup for state stores

Author: Eno Thereska <en...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3135 from enothereska/exceptions-stores-KAFKA-5314


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f4e0deca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f4e0deca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f4e0deca

Branch: refs/heads/trunk
Commit: f4e0deca43595f30b9c6f1a1666192416ecc8a9c
Parents: 3de2d29
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Jun 7 21:16:46 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 7 21:16:46 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/state/KeyValueStore.java      | 10 ++--
 .../streams/state/ReadOnlySessionStore.java     |  3 ++
 .../streams/state/ReadOnlyWindowStore.java      |  2 +
 .../kafka/streams/state/SessionStore.java       |  4 ++
 .../apache/kafka/streams/state/WindowStore.java |  6 +++
 .../state/internals/CachingKeyValueStore.java   |  7 +--
 .../CompositeReadOnlyKeyValueStore.java         |  5 ++
 .../streams/state/internals/MemoryLRUCache.java |  5 ++
 .../streams/state/internals/RocksDBStore.java   |  8 +--
 .../internals/AbstractKeyValueStoreTest.java    | 53 ++++++++++++++++++++
 .../internals/CachingKeyValueStoreTest.java     | 49 ++++++++++++++----
 .../internals/CachingSessionStoreTest.java      | 40 +++++++++++++++
 .../state/internals/CachingWindowStoreTest.java | 27 +++++++++-
 .../CompositeReadOnlyKeyValueStoreTest.java     | 15 ++++++
 .../InMemoryKeyValueLoggedStoreTest.java        | 49 ++++++++++++++++++
 .../internals/RocksDBSessionStoreTest.java      | 40 +++++++++++++++
 .../state/internals/RocksDBStoreTest.java       | 48 +++++++++++++++---
 .../state/internals/RocksDBWindowStoreTest.java | 30 +++++++++++
 18 files changed, 372 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index bd18835..129ae6b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -33,8 +33,8 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
      * Update the value associated with this key
      *
      * @param key The key to associate the value to
-     * @param value The value
-     * @throws NullPointerException If null is used for key or value.
+     * @param value The value, it can be null.
+     * @throws NullPointerException If null is used for key.
      */
     void put(K key, V value);
 
@@ -43,9 +43,9 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
      * is already associated with the key
      *
      * @param key The key to associate the value to
-     * @param value The value
+     * @param value The value, it can be null
      * @return The old value or null if there is no such key.
-     * @throws NullPointerException If null is used for key or value.
+     * @throws NullPointerException If null is used for key.
      */
     V putIfAbsent(K key, V value);
 
@@ -53,7 +53,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
      * Update all the given key/value pairs
      *
      * @param entries A list of entries to put into the store.
-     * @throws NullPointerException If null is used for any key or value.
+     * @throws NullPointerException If null is used for key.
      */
     void putAll(List<KeyValue<K, V>> entries);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
index 094e3fc..e3859fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
@@ -37,6 +37,8 @@ public interface ReadOnlySessionStore<K, AGG> {
      *
      * @param    key record key to find aggregated session values for
      * @return   KeyValueIterator containing all sessions for the provided key.
+     * @throws   NullPointerException If null is used for key.
+     *
      */
     KeyValueIterator<Windowed<K>, AGG> fetch(final K key);
 
@@ -49,6 +51,7 @@ public interface ReadOnlySessionStore<K, AGG> {
      * @param    from first key in the range to find aggregated session values for
      * @param    to last key in the range to find aggregated session values for
      * @return   KeyValueIterator containing all sessions for the provided key.
+     * @throws   NullPointerException If null is used for any of the keys.
      */
     KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index b128c58..5252cd6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -56,6 +56,7 @@ public interface ReadOnlyWindowStore<K, V> {
      *
      * @return an iterator over key-value pairs {@code <timestamp, value>}
      * @throws InvalidStateStoreException if the store is not initialized
+     * @throws NullPointerException If null is used for key.
      */
     WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
 
@@ -69,6 +70,7 @@ public interface ReadOnlyWindowStore<K, V> {
      * @param timeTo    time range end (inclusive)
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
      * @throws InvalidStateStoreException if the store is not initialized
+     * @throws NullPointerException If null is used for any key.
      */
     KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
index a4cf12e..c98f8ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
@@ -33,6 +33,7 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
      * @param key the key to return sessions for
      * @param earliestSessionEndTime
      * @return iterator of sessions with the matching key and aggregated values
+     * @throws NullPointerException If null is used for key.
      */
     KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, long earliestSessionEndTime, final long latestSessionStartTime);
 
@@ -44,12 +45,14 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
      * @param keyTo The last key that could be in the range
      * @param earliestSessionEndTime
      * @return iterator of sessions with the matching keys and aggregated values
+     * @throws NullPointerException If null is used for any key.
      */
     KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, long earliestSessionEndTime, final long latestSessionStartTime);
 
     /**
      * Remove the session aggregated with provided {@link Windowed} key from the store
      * @param sessionKey key of the session to remove
+     * @throws NullPointerException If null is used for sessionKey.
      */
     void remove(final Windowed<K> sessionKey);
 
@@ -57,6 +60,7 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
      * Write the aggregated value for the provided key to the store
      * @param sessionKey key of the session to write
      * @param aggregate  the aggregated value for the session
+     * @throws NullPointerException If null is used for sessionKey.
      */
     void put(final Windowed<K> sessionKey, final AGG aggregate);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index fc8ca6f..6a4d5f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -29,11 +29,17 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
     /**
      * Put a key-value pair with the current wall-clock time as the timestamp
      * into the corresponding window
+     * @param key The key to associate the value to
+     * @param value The value, it can be null
+     * @throws NullPointerException If null is used for key.
      */
     void put(K key, V value);
 
     /**
      * Put a key-value pair with the given timestamp into the corresponding window
+     * @param key The key to associate the value to
+     * @param value The value, it can be null
+     * @throws NullPointerException If null is used for key.
      */
     void put(K key, V value, long timestamp);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 2a720be..4f86d4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.List;
+import java.util.Objects;
 
 class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V>, CachedStateStore<K, V> {
 
@@ -128,9 +129,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
     @Override
     public synchronized V get(final K key) {
         validateStoreOpen();
-        if (key == null) {
-            return null;
-        }
+        Objects.requireNonNull(key);
+
         final byte[] rawKey = serdes.rawKey(key);
         return get(rawKey);
     }
@@ -215,6 +215,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
     @Override
     public synchronized V delete(final K key) {
         validateStoreOpen();
+        Objects.requireNonNull(key);
         final byte[] rawKey = serdes.rawKey(key);
         final Bytes bytesKey = Bytes.wrap(rawKey);
         final V v = get(rawKey);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
index 6366351..3022645 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 
 /**
  * A wrapper over the underlying {@link ReadOnlyKeyValueStore}s found in a {@link
@@ -47,8 +48,10 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
         this.storeName = storeName;
     }
 
+
     @Override
     public V get(final K key) {
+        Objects.requireNonNull(key);
         final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
         for (ReadOnlyKeyValueStore<K, V> store : stores) {
             try {
@@ -66,6 +69,8 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
 
     @Override
     public KeyValueIterator<K, V> range(final K from, final K to) {
+        Objects.requireNonNull(from);
+        Objects.requireNonNull(to);
         final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
             @Override
             public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 988a302..beb9ce1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.state.StateSerdes;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * An in-memory LRU cache store based on HashSet and HashMap.
@@ -134,16 +135,19 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public synchronized V get(K key) {
+        Objects.requireNonNull(key);
         return this.map.get(key);
     }
 
     @Override
     public synchronized void put(K key, V value) {
+        Objects.requireNonNull(key);
         this.map.put(key, value);
     }
 
     @Override
     public synchronized V putIfAbsent(K key, V value) {
+        Objects.requireNonNull(key);
         V originalValue = get(key);
         if (originalValue == null) {
             put(key, value);
@@ -159,6 +163,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public synchronized V delete(K key) {
+        Objects.requireNonNull(key);
         V value = this.map.remove(key);
         return value;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 7a0b6ee..a3ecb64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -72,7 +71,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     private static final int TTL_NOT_USED = -1;
 
-    // TODO: these values should be configurable
     private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
     private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
     private static final long WRITE_BUFFER_SIZE = 16 * 1024 * 1024L;
@@ -164,7 +162,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         try {
             this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
         } catch (IOException e) {
-            throw new StreamsException(e);
+            throw new ProcessorStateException(e);
         }
     }
 
@@ -305,6 +303,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     @Override
     public synchronized KeyValueIterator<K, V> range(K from, K to) {
         validateStoreOpen();
+
         // query rocksdb
         final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(name, db.newIterator(), serdes, from, to);
         openIterators.add(rocksDBRangeIterator);
@@ -477,6 +476,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             super(storeName, iter, serdes);
             iter.seek(serdes.rawKey(from));
             this.rawToKey = serdes.rawKey(to);
+            if (this.rawToKey == null) {
+                throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to);
+            }
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 3eb406b..345639b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -26,6 +26,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collections;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
@@ -216,6 +218,56 @@ public abstract class AbstractKeyValueStoreTest {
         assertEquals(false, driver.flushedEntryRemoved(4));
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnPutNullKey() throws Exception {
+        store.put(null, "anyValue");
+    }
+
+    @Test
+    public void shouldNotThrowNullPointerExceptionOnPutNullValue() throws Exception {
+        store.put(1, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnPutIfAbsentNullKey() throws Exception {
+        store.putIfAbsent(null, "anyValue");
+    }
+
+    @Test
+    public void shouldNotThrowNullPointerExceptionOnPutIfAbsentNullValue() throws Exception {
+        store.putIfAbsent(1, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnPutAllNullKey() throws Exception {
+        store.putAll(Collections.singletonList(new KeyValue<Integer, String>(null, "anyValue")));
+    }
+
+    @Test
+    public void shouldNotThrowNullPointerExceptionOnPutAllNullKey() throws Exception {
+        store.putAll(Collections.singletonList(new KeyValue<Integer, String>(1, null)));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnDeleteNullKey() throws Exception {
+        store.delete(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnGetNullKey() throws Exception {
+        store.get(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnRangeNullFromKey() throws Exception {
+        store.range(null, 2);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnRangeNullToKey() throws Exception {
+        store.range(2, null);
+    }
+
     @Test
     public void testSize() {
         assertEquals("A newly created store should have no entries", 0, store.approximateNumEntries());
@@ -225,6 +277,7 @@ public abstract class AbstractKeyValueStoreTest {
         store.put(2, "two");
         store.put(4, "four");
         store.put(5, "five");
+        store.flush();
         assertEquals(5, store.approximateNumEntries());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 2534b01..8106a10 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -17,16 +17,20 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockProcessorContext;
 import org.junit.After;
 import org.junit.Before;
@@ -45,14 +49,14 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
-public class CachingKeyValueStoreTest {
+public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     private final int maxCacheSizeBytes = 150;
     private MockProcessorContext context;
     private CachingKeyValueStore<String, String> store;
     private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore;
     private ThreadCache cache;
-    private CacheFlushListenerStub<String> cacheFlushListener;
+    private CacheFlushListenerStub<String, String> cacheFlushListener;
     private String topic;
 
     @Before
@@ -74,6 +78,36 @@ public class CachingKeyValueStoreTest {
         context.close();
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context,
+                                                             final Class<K> keyClass,
+                                                             final Class<V> valueClass,
+                                                             final boolean useContextSerdes) {
+        final String storeName = "cache-store";
+
+        final Stores.PersistentKeyValueFactory<?, ?> factory = Stores
+                .create(storeName)
+                .withKeys(Serdes.Bytes())
+                .withValues(Serdes.ByteArray())
+                .persistent();
+
+
+        final KeyValueStore<Bytes, byte[]> underlyingStore = (KeyValueStore<Bytes, byte[]>) factory.build().get();
+        final CacheFlushListenerStub<K, V> cacheFlushListener = new CacheFlushListenerStub<>();
+        final CachingKeyValueStore<K, V> store;
+        if (useContextSerdes) {
+            store = new CachingKeyValueStore<>(underlyingStore,
+                (Serde<K>) context.keySerde(), (Serde<V>) context.valueSerde());
+        } else {
+            store = new CachingKeyValueStore<>(underlyingStore,
+                Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
+        }
+        store.setFlushListener(cacheFlushListener);
+        store.init(context, store);
+        return store;
+    }
+
     @Test
     public void shouldPutGetToFromCache() throws Exception {
         store.put("key", "value");
@@ -215,11 +249,6 @@ public class CachingKeyValueStoreTest {
         store.delete("key");
     }
 
-    @Test
-    public void shouldReturnNullIfKeyIsNull() throws Exception {
-        assertNull(store.get(null));
-    }
-
     private int addItemsToCache() throws IOException {
         int cachedSize = 0;
         int i = 0;
@@ -231,11 +260,11 @@ public class CachingKeyValueStoreTest {
         return i;
     }
 
-    public static class CacheFlushListenerStub<K> implements CacheFlushListener<K, String> {
-        final Map<K, Change<String>> forwarded = new HashMap<>();
+    public static class CacheFlushListenerStub<K, V> implements CacheFlushListener<K, V> {
+        final Map<K, Change<V>> forwarded = new HashMap<>();
 
         @Override
-        public void apply(final K key, final String newValue, final String oldValue) {
+        public void apply(final K key, final V newValue, final V oldValue) {
             forwarded.put(key, new Change<>(newValue, oldValue));
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index bfc20ec..6f39fae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -276,6 +276,46 @@ public class CachingSessionStoreTest {
         cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() throws Exception {
+        cachingStore.findSessions(null, 1L, 2L);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() throws Exception {
+        cachingStore.findSessions(null, "anyKeyTo", 1L, 2L);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() throws Exception {
+        cachingStore.findSessions("anyKeyFrom", null, 1L, 2L);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnFetchNullFromKey() throws Exception {
+        cachingStore.fetch(null, "anyToKey");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnFetchNullToKey() throws Exception {
+        cachingStore.fetch("anyFromKey", null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnFetchNullKey() throws Exception {
+        cachingStore.fetch(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnRemoveNullKey() throws Exception {
+        cachingStore.remove(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnPutNullKey() throws Exception {
+        cachingStore.put(null, 1L);
+    }
+
     private List<KeyValue<Windowed<String>, Long>> addSessionsUntilOverflow(final String...sessionIds) {
         final Random random = new Random();
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index faf6e83..e8ad902 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -56,7 +56,7 @@ public class CachingWindowStoreTest {
     private MockProcessorContext context;
     private RocksDBSegmentedBytesStore underlying;
     private CachingWindowStore<String, String> cachingStore;
-    private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>> cacheListener;
+    private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> cacheListener;
     private ThreadCache cache;
     private String topic;
     private WindowKeySchema keySchema;
@@ -261,6 +261,31 @@ public class CachingWindowStoreTest {
         );
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnPutNullKey() throws Exception {
+        cachingStore.put(null, "anyValue");
+    }
+
+    @Test
+    public void shouldNotThrowNullPointerExceptionOnPutNullValue() throws Exception {
+        cachingStore.put("a", null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnFetchNullKey() throws Exception {
+        cachingStore.fetch(null, 1L, 2L);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnRangeNullFromKey() throws Exception {
+        cachingStore.fetch(null, "anyTo", 1L, 2L);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnRangeNullToKey() throws Exception {
+        cachingStore.fetch("anyFrom", null, 1L, 2L);
+    }
+
     private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) {
         return KeyValue.pair(new Windowed<>(key, new TimeWindow(timestamp, timestamp + WINDOW_SIZE)), value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 2e5b872..a8ac65f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -69,6 +69,21 @@ public class CompositeReadOnlyKeyValueStoreTest {
         assertNull(theStore.get("whatever"));
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnGetNullKey() throws Exception {
+        theStore.get(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnRangeNullFromKey() throws Exception {
+        theStore.range(null, "to");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnRangeNullToKey() throws Exception {
+        theStore.range("from", null);
+    }
+
     @Test
     public void shouldReturnValueIfExists() throws Exception {
         stubOneUnderlying.put("key", "value");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
new file mode 100644
index 0000000..fbfbeb8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Collections;
+
+public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
+            ProcessorContext context,
+            Class<K> keyClass,
+            Class<V> valueClass,
+            boolean useContextSerdes) {
+
+        StateStoreSupplier supplier;
+        if (useContextSerdes) {
+            supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde())
+                .inMemory().enableLogging(Collections.singletonMap("retention.ms", "1000")).build();
+        } else {
+            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass)
+                .inMemory().enableLogging(Collections.singletonMap("retention.ms", "1000")).build();
+        }
+
+        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        store.init(context, store);
+        return store;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index c30b0e2..bd10db9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -194,6 +194,46 @@ public class RocksDBSessionStoreTest {
         assertThat(rangeResults, equalTo(Arrays.asList(1L, 3L, 2L, 4L, 5L)));
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() throws Exception {
+        sessionStore.findSessions(null, 1L, 2L);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() throws Exception {
+        sessionStore.findSessions(null, "anyKeyTo", 1L, 2L);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() throws Exception {
+        sessionStore.findSessions("anyKeyFrom", null, 1L, 2L);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnFetchNullFromKey() throws Exception {
+        sessionStore.fetch(null, "anyToKey");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnFetchNullToKey() throws Exception {
+        sessionStore.fetch("anyFromKey", null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnFetchNullKey() throws Exception {
+        sessionStore.fetch(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnRemoveNullKey() throws Exception {
+        sessionStore.remove(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnPutNullKey() throws Exception {
+        sessionStore.put(null, 1L);
+    }
+
     static List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Windowed<String>, Long> iterator) {
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
         while (iterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index c43a39a..c020062 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -16,32 +16,46 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
+
+import static org.junit.Assert.assertTrue;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.io.IOException;
 import org.rocksdb.Options;
 
 public class RocksDBStoreTest {
     private final File tempDir = TestUtils.tempDirectory();
 
     private RocksDBStore<String, String> subject;
+    private MockProcessorContext context;
+    private File dir;
 
     @Before
     public void setUp() throws Exception {
         subject = new RocksDBStore<>("test", Serdes.String(), Serdes.String());
+        dir = TestUtils.tempDirectory();
+        context = new MockProcessorContext(dir,
+            Serdes.String(),
+            Serdes.String(),
+            new NoOpRecordCollector(),
+            new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
     }
 
     @After
@@ -71,6 +85,26 @@ public class RocksDBStoreTest {
         assertTrue(MockRocksDbConfigSetter.called);
     }
 
+    @Test(expected = ProcessorStateException.class)
+    public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() throws IOException {
+        final File tmpDir = TestUtils.tempDirectory();
+        MockProcessorContext tmpContext = new MockProcessorContext(tmpDir,
+            Serdes.String(),
+            Serdes.Long(),
+            new NoOpRecordCollector(),
+            new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+        tmpDir.setReadOnly();
+
+        subject.openDB(tmpContext);
+    }
+
+    @Test(expected = ProcessorStateException.class)
+    public void shouldThrowProcessorStateExeptionOnPutDeletedDir() throws IOException {
+        subject.init(context, subject);
+        Utils.delete(dir);
+        subject.put("anyKey", "anyValue");
+        subject.flush();
+    }
 
     public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
         static boolean called;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 040def6..883506d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -714,6 +714,36 @@ public class RocksDBWindowStoreTest {
         )));
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnPutNullKey() throws Exception {
+        windowStore = createWindowStore(context, false, true);
+        windowStore.put(null, "anyValue");
+    }
+
+    @Test
+    public void shouldNotThrowNullPointerExceptionOnPutNullValue() throws Exception {
+        windowStore = createWindowStore(context, false, true);
+        windowStore.put(1, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnGetNullKey() throws Exception {
+        windowStore = createWindowStore(context, false, true);
+        windowStore.fetch(null, 1L, 2L);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnRangeNullFromKey() throws Exception {
+        windowStore = createWindowStore(context, false, true);
+        windowStore.fetch(null, 2, 1L, 2L);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnRangeNullToKey() throws Exception {
+        windowStore = createWindowStore(context, false, true);
+        windowStore.fetch(1, null, 1L, 2L);
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void shouldFetchAndIterateOverExactBinaryKeys() throws Exception {