You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/08 04:16:50 UTC
kafka git commit: KAFKA-5314;
exception handling and cleanup for state stores
Repository: kafka
Updated Branches:
refs/heads/trunk 3de2d296e -> f4e0deca4
KAFKA-5314; exception handling and cleanup for state stores
Author: Eno Thereska <en...@gmail.com>
Reviewers: Damian Guy <da...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3135 from enothereska/exceptions-stores-KAFKA-5314
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f4e0deca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f4e0deca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f4e0deca
Branch: refs/heads/trunk
Commit: f4e0deca43595f30b9c6f1a1666192416ecc8a9c
Parents: 3de2d29
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Jun 7 21:16:46 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 7 21:16:46 2017 -0700
----------------------------------------------------------------------
.../kafka/streams/state/KeyValueStore.java | 10 ++--
.../streams/state/ReadOnlySessionStore.java | 3 ++
.../streams/state/ReadOnlyWindowStore.java | 2 +
.../kafka/streams/state/SessionStore.java | 4 ++
.../apache/kafka/streams/state/WindowStore.java | 6 +++
.../state/internals/CachingKeyValueStore.java | 7 +--
.../CompositeReadOnlyKeyValueStore.java | 5 ++
.../streams/state/internals/MemoryLRUCache.java | 5 ++
.../streams/state/internals/RocksDBStore.java | 8 +--
.../internals/AbstractKeyValueStoreTest.java | 53 ++++++++++++++++++++
.../internals/CachingKeyValueStoreTest.java | 49 ++++++++++++++----
.../internals/CachingSessionStoreTest.java | 40 +++++++++++++++
.../state/internals/CachingWindowStoreTest.java | 27 +++++++++-
.../CompositeReadOnlyKeyValueStoreTest.java | 15 ++++++
.../InMemoryKeyValueLoggedStoreTest.java | 49 ++++++++++++++++++
.../internals/RocksDBSessionStoreTest.java | 40 +++++++++++++++
.../state/internals/RocksDBStoreTest.java | 48 +++++++++++++++---
.../state/internals/RocksDBWindowStoreTest.java | 30 +++++++++++
18 files changed, 372 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index bd18835..129ae6b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -33,8 +33,8 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* Update the value associated with this key
*
* @param key The key to associate the value to
- * @param value The value
- * @throws NullPointerException If null is used for key or value.
+ * @param value The value, it can be null.
+ * @throws NullPointerException If null is used for key.
*/
void put(K key, V value);
@@ -43,9 +43,9 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* is already associated with the key
*
* @param key The key to associate the value to
- * @param value The value
+ * @param value The value, it can be null
* @return The old value or null if there is no such key.
- * @throws NullPointerException If null is used for key or value.
+ * @throws NullPointerException If null is used for key.
*/
V putIfAbsent(K key, V value);
@@ -53,7 +53,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
* Update all the given key/value pairs
*
* @param entries A list of entries to put into the store.
- * @throws NullPointerException If null is used for any key or value.
+ * @throws NullPointerException If null is used for key.
*/
void putAll(List<KeyValue<K, V>> entries);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
index 094e3fc..e3859fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
@@ -37,6 +37,8 @@ public interface ReadOnlySessionStore<K, AGG> {
*
* @param key record key to find aggregated session values for
* @return KeyValueIterator containing all sessions for the provided key.
+ * @throws NullPointerException If null is used for key.
+ *
*/
KeyValueIterator<Windowed<K>, AGG> fetch(final K key);
@@ -49,6 +51,7 @@ public interface ReadOnlySessionStore<K, AGG> {
* @param from first key in the range to find aggregated session values for
* @param to last key in the range to find aggregated session values for
* @return KeyValueIterator containing all sessions for the provided key.
+ * @throws NullPointerException If null is used for any of the keys.
*/
KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index b128c58..5252cd6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -56,6 +56,7 @@ public interface ReadOnlyWindowStore<K, V> {
*
* @return an iterator over key-value pairs {@code <timestamp, value>}
* @throws InvalidStateStoreException if the store is not initialized
+ * @throws NullPointerException If null is used for key.
*/
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
@@ -69,6 +70,7 @@ public interface ReadOnlyWindowStore<K, V> {
* @param timeTo time range end (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
+ * @throws NullPointerException If null is used for any key.
*/
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
index a4cf12e..c98f8ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
@@ -33,6 +33,7 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
* @param key the key to return sessions for
* @param earliestSessionEndTime
* @return iterator of sessions with the matching key and aggregated values
+ * @throws NullPointerException If null is used for key.
*/
KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, long earliestSessionEndTime, final long latestSessionStartTime);
@@ -44,12 +45,14 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
* @param keyTo The last key that could be in the range
* @param earliestSessionEndTime
* @return iterator of sessions with the matching keys and aggregated values
+ * @throws NullPointerException If null is used for any key.
*/
KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, long earliestSessionEndTime, final long latestSessionStartTime);
/**
* Remove the session aggregated with provided {@link Windowed} key from the store
* @param sessionKey key of the session to remove
+ * @throws NullPointerException If null is used for sessionKey.
*/
void remove(final Windowed<K> sessionKey);
@@ -57,6 +60,7 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
* Write the aggregated value for the provided key to the store
* @param sessionKey key of the session to write
* @param aggregate the aggregated value for the session
+ * @throws NullPointerException If null is used for sessionKey.
*/
void put(final Windowed<K> sessionKey, final AGG aggregate);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index fc8ca6f..6a4d5f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -29,11 +29,17 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
/**
* Put a key-value pair with the current wall-clock time as the timestamp
* into the corresponding window
+ * @param key The key to associate the value to
+ * @param value The value, it can be null
+ * @throws NullPointerException If null is used for key.
*/
void put(K key, V value);
/**
* Put a key-value pair with the given timestamp into the corresponding window
+ * @param key The key to associate the value to
+ * @param value The value, it can be null
+ * @throws NullPointerException If null is used for key.
*/
void put(K key, V value, long timestamp);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 2a720be..4f86d4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import java.util.List;
+import java.util.Objects;
class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V>, CachedStateStore<K, V> {
@@ -128,9 +129,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
@Override
public synchronized V get(final K key) {
validateStoreOpen();
- if (key == null) {
- return null;
- }
+ Objects.requireNonNull(key);
+
final byte[] rawKey = serdes.rawKey(key);
return get(rawKey);
}
@@ -215,6 +215,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
@Override
public synchronized V delete(final K key) {
validateStoreOpen();
+ Objects.requireNonNull(key);
final byte[] rawKey = serdes.rawKey(key);
final Bytes bytesKey = Bytes.wrap(rawKey);
final V v = get(rawKey);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
index 6366351..3022645 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Objects;
/**
* A wrapper over the underlying {@link ReadOnlyKeyValueStore}s found in a {@link
@@ -47,8 +48,10 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
this.storeName = storeName;
}
+
@Override
public V get(final K key) {
+ Objects.requireNonNull(key);
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
for (ReadOnlyKeyValueStore<K, V> store : stores) {
try {
@@ -66,6 +69,8 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
@Override
public KeyValueIterator<K, V> range(final K from, final K to) {
+ Objects.requireNonNull(from);
+ Objects.requireNonNull(to);
final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
@Override
public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 988a302..beb9ce1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.state.StateSerdes;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
/**
* An in-memory LRU cache store based on HashSet and HashMap.
@@ -134,16 +135,19 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
@Override
public synchronized V get(K key) {
+ Objects.requireNonNull(key);
return this.map.get(key);
}
@Override
public synchronized void put(K key, V value) {
+ Objects.requireNonNull(key);
this.map.put(key, value);
}
@Override
public synchronized V putIfAbsent(K key, V value) {
+ Objects.requireNonNull(key);
V originalValue = get(key);
if (originalValue == null) {
put(key, value);
@@ -159,6 +163,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
@Override
public synchronized V delete(K key) {
+ Objects.requireNonNull(key);
V value = this.map.remove(key);
return value;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 7a0b6ee..a3ecb64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -72,7 +71,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private static final int TTL_NOT_USED = -1;
- // TODO: these values should be configurable
private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
private static final long WRITE_BUFFER_SIZE = 16 * 1024 * 1024L;
@@ -164,7 +162,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
try {
this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
} catch (IOException e) {
- throw new StreamsException(e);
+ throw new ProcessorStateException(e);
}
}
@@ -305,6 +303,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public synchronized KeyValueIterator<K, V> range(K from, K to) {
validateStoreOpen();
+
// query rocksdb
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(name, db.newIterator(), serdes, from, to);
openIterators.add(rocksDBRangeIterator);
@@ -477,6 +476,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
super(storeName, iter, serdes);
iter.seek(serdes.rawKey(from));
this.rawToKey = serdes.rawKey(to);
+ if (this.rawToKey == null) {
+ throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 3eb406b..345639b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -26,6 +26,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.util.Collections;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@@ -216,6 +218,56 @@ public abstract class AbstractKeyValueStoreTest {
assertEquals(false, driver.flushedEntryRemoved(4));
}
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnPutNullKey() throws Exception {
+ store.put(null, "anyValue");
+ }
+
+ @Test
+ public void shouldNotThrowNullPointerExceptionOnPutNullValue() throws Exception {
+ store.put(1, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnPutIfAbsentNullKey() throws Exception {
+ store.putIfAbsent(null, "anyValue");
+ }
+
+ @Test
+ public void shouldNotThrowNullPointerExceptionOnPutIfAbsentNullValue() throws Exception {
+ store.putIfAbsent(1, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnPutAllNullKey() throws Exception {
+ store.putAll(Collections.singletonList(new KeyValue<Integer, String>(null, "anyValue")));
+ }
+
+ @Test
+ public void shouldNotThrowNullPointerExceptionOnPutAllNullKey() throws Exception {
+ store.putAll(Collections.singletonList(new KeyValue<Integer, String>(1, null)));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnDeleteNullKey() throws Exception {
+ store.delete(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnGetNullKey() throws Exception {
+ store.get(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnRangeNullFromKey() throws Exception {
+ store.range(null, 2);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnRangeNullToKey() throws Exception {
+ store.range(2, null);
+ }
+
@Test
public void testSize() {
assertEquals("A newly created store should have no entries", 0, store.approximateNumEntries());
@@ -225,6 +277,7 @@ public abstract class AbstractKeyValueStoreTest {
store.put(2, "two");
store.put(4, "four");
store.put(5, "five");
+ store.flush();
assertEquals(5, store.approximateNumEntries());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 2534b01..8106a10 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -17,16 +17,20 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.After;
import org.junit.Before;
@@ -45,14 +49,14 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-public class CachingKeyValueStoreTest {
+public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
private final int maxCacheSizeBytes = 150;
private MockProcessorContext context;
private CachingKeyValueStore<String, String> store;
private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore;
private ThreadCache cache;
- private CacheFlushListenerStub<String> cacheFlushListener;
+ private CacheFlushListenerStub<String, String> cacheFlushListener;
private String topic;
@Before
@@ -74,6 +78,36 @@ public class CachingKeyValueStoreTest {
context.close();
}
+ @SuppressWarnings("unchecked")
+ @Override
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context,
+ final Class<K> keyClass,
+ final Class<V> valueClass,
+ final boolean useContextSerdes) {
+ final String storeName = "cache-store";
+
+ final Stores.PersistentKeyValueFactory<?, ?> factory = Stores
+ .create(storeName)
+ .withKeys(Serdes.Bytes())
+ .withValues(Serdes.ByteArray())
+ .persistent();
+
+
+ final KeyValueStore<Bytes, byte[]> underlyingStore = (KeyValueStore<Bytes, byte[]>) factory.build().get();
+ final CacheFlushListenerStub<K, V> cacheFlushListener = new CacheFlushListenerStub<>();
+ final CachingKeyValueStore<K, V> store;
+ if (useContextSerdes) {
+ store = new CachingKeyValueStore<>(underlyingStore,
+ (Serde<K>) context.keySerde(), (Serde<V>) context.valueSerde());
+ } else {
+ store = new CachingKeyValueStore<>(underlyingStore,
+ Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
+ }
+ store.setFlushListener(cacheFlushListener);
+ store.init(context, store);
+ return store;
+ }
+
@Test
public void shouldPutGetToFromCache() throws Exception {
store.put("key", "value");
@@ -215,11 +249,6 @@ public class CachingKeyValueStoreTest {
store.delete("key");
}
- @Test
- public void shouldReturnNullIfKeyIsNull() throws Exception {
- assertNull(store.get(null));
- }
-
private int addItemsToCache() throws IOException {
int cachedSize = 0;
int i = 0;
@@ -231,11 +260,11 @@ public class CachingKeyValueStoreTest {
return i;
}
- public static class CacheFlushListenerStub<K> implements CacheFlushListener<K, String> {
- final Map<K, Change<String>> forwarded = new HashMap<>();
+ public static class CacheFlushListenerStub<K, V> implements CacheFlushListener<K, V> {
+ final Map<K, Change<V>> forwarded = new HashMap<>();
@Override
- public void apply(final K key, final String newValue, final String oldValue) {
+ public void apply(final K key, final V newValue, final V oldValue) {
forwarded.put(key, new Change<>(newValue, oldValue));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index bfc20ec..6f39fae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -276,6 +276,46 @@ public class CachingSessionStoreTest {
cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
}
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() throws Exception {
+ cachingStore.findSessions(null, 1L, 2L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() throws Exception {
+ cachingStore.findSessions(null, "anyKeyTo", 1L, 2L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() throws Exception {
+ cachingStore.findSessions("anyKeyFrom", null, 1L, 2L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnFetchNullFromKey() throws Exception {
+ cachingStore.fetch(null, "anyToKey");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnFetchNullToKey() throws Exception {
+ cachingStore.fetch("anyFromKey", null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnFetchNullKey() throws Exception {
+ cachingStore.fetch(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnRemoveNullKey() throws Exception {
+ cachingStore.remove(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnPutNullKey() throws Exception {
+ cachingStore.put(null, 1L);
+ }
+
private List<KeyValue<Windowed<String>, Long>> addSessionsUntilOverflow(final String...sessionIds) {
final Random random = new Random();
final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index faf6e83..e8ad902 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -56,7 +56,7 @@ public class CachingWindowStoreTest {
private MockProcessorContext context;
private RocksDBSegmentedBytesStore underlying;
private CachingWindowStore<String, String> cachingStore;
- private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>> cacheListener;
+ private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> cacheListener;
private ThreadCache cache;
private String topic;
private WindowKeySchema keySchema;
@@ -261,6 +261,31 @@ public class CachingWindowStoreTest {
);
}
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnPutNullKey() throws Exception {
+ cachingStore.put(null, "anyValue");
+ }
+
+ @Test
+ public void shouldNotThrowNullPointerExceptionOnPutNullValue() throws Exception {
+ cachingStore.put("a", null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnFetchNullKey() throws Exception {
+ cachingStore.fetch(null, 1L, 2L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnRangeNullFromKey() throws Exception {
+ cachingStore.fetch(null, "anyTo", 1L, 2L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnRangeNullToKey() throws Exception {
+ cachingStore.fetch("anyFrom", null, 1L, 2L);
+ }
+
private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) {
return KeyValue.pair(new Windowed<>(key, new TimeWindow(timestamp, timestamp + WINDOW_SIZE)), value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 2e5b872..a8ac65f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -69,6 +69,21 @@ public class CompositeReadOnlyKeyValueStoreTest {
assertNull(theStore.get("whatever"));
}
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnGetNullKey() throws Exception {
+ theStore.get(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnRangeNullFromKey() throws Exception {
+ theStore.range(null, "to");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnRangeNullToKey() throws Exception {
+ theStore.range("from", null);
+ }
+
@Test
public void shouldReturnValueIfExists() throws Exception {
stubOneUnderlying.put("key", "value");
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
new file mode 100644
index 0000000..fbfbeb8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Collections;
+
+public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(
+ ProcessorContext context,
+ Class<K> keyClass,
+ Class<V> valueClass,
+ boolean useContextSerdes) {
+
+ StateStoreSupplier supplier;
+ if (useContextSerdes) {
+ supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde())
+ .inMemory().enableLogging(Collections.singletonMap("retention.ms", "1000")).build();
+ } else {
+ supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass)
+ .inMemory().enableLogging(Collections.singletonMap("retention.ms", "1000")).build();
+ }
+
+ KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+ store.init(context, store);
+ return store;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index c30b0e2..bd10db9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -194,6 +194,46 @@ public class RocksDBSessionStoreTest {
assertThat(rangeResults, equalTo(Arrays.asList(1L, 3L, 2L, 4L, 5L)));
}
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() throws Exception {
+ sessionStore.findSessions(null, 1L, 2L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() throws Exception {
+ sessionStore.findSessions(null, "anyKeyTo", 1L, 2L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() throws Exception {
+ sessionStore.findSessions("anyKeyFrom", null, 1L, 2L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnFetchNullFromKey() throws Exception {
+ sessionStore.fetch(null, "anyToKey");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnFetchNullToKey() throws Exception {
+ sessionStore.fetch("anyFromKey", null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnFetchNullKey() throws Exception {
+ sessionStore.fetch(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnRemoveNullKey() throws Exception {
+ sessionStore.remove(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnPutNullKey() throws Exception {
+ sessionStore.put(null, 1L);
+ }
+
static List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Windowed<String>, Long> iterator) {
final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
while (iterator.hasNext()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index c43a39a..c020062 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -16,32 +16,46 @@
*/
package org.apache.kafka.streams.state.internals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
+
+import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.io.IOException;
import org.rocksdb.Options;
public class RocksDBStoreTest {
private final File tempDir = TestUtils.tempDirectory();
private RocksDBStore<String, String> subject;
+ private MockProcessorContext context;
+ private File dir;
@Before
public void setUp() throws Exception {
subject = new RocksDBStore<>("test", Serdes.String(), Serdes.String());
+ dir = TestUtils.tempDirectory();
+ context = new MockProcessorContext(dir,
+ Serdes.String(),
+ Serdes.String(),
+ new NoOpRecordCollector(),
+ new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
}
@After
@@ -71,6 +85,26 @@ public class RocksDBStoreTest {
assertTrue(MockRocksDbConfigSetter.called);
}
+ @Test(expected = ProcessorStateException.class)
+ public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() throws IOException {
+ final File tmpDir = TestUtils.tempDirectory();
+ MockProcessorContext tmpContext = new MockProcessorContext(tmpDir,
+ Serdes.String(),
+ Serdes.Long(),
+ new NoOpRecordCollector(),
+ new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+ tmpDir.setReadOnly();
+
+ subject.openDB(tmpContext);
+ }
+
+ @Test(expected = ProcessorStateException.class)
+ public void shouldThrowProcessorStateExeptionOnPutDeletedDir() throws IOException {
+ subject.init(context, subject);
+ Utils.delete(dir);
+ subject.put("anyKey", "anyValue");
+ subject.flush();
+ }
public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
static boolean called;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4e0deca/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 040def6..883506d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -714,6 +714,36 @@ public class RocksDBWindowStoreTest {
)));
}
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnPutNullKey() throws Exception {
+ windowStore = createWindowStore(context, false, true);
+ windowStore.put(null, "anyValue");
+ }
+
+ @Test
+ public void shouldNotThrowNullPointerExceptionOnPutNullValue() throws Exception {
+ windowStore = createWindowStore(context, false, true);
+ windowStore.put(1, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnGetNullKey() throws Exception {
+ windowStore = createWindowStore(context, false, true);
+ windowStore.fetch(null, 1L, 2L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnRangeNullFromKey() throws Exception {
+ windowStore = createWindowStore(context, false, true);
+ windowStore.fetch(null, 2, 1L, 2L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnRangeNullToKey() throws Exception {
+ windowStore = createWindowStore(context, false, true);
+ windowStore.fetch(1, null, 1L, 2L);
+ }
+
@SuppressWarnings("unchecked")
@Test
public void shouldFetchAndIterateOverExactBinaryKeys() throws Exception {