You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/06/22 07:40:59 UTC
kafka git commit: KAFKA-4659;
Improve test coverage of CachingKeyValueStore
Repository: kafka
Updated Branches:
refs/heads/trunk 914e42a28 -> 4e8797f54
KAFKA-4659; Improve test coverage of CachingKeyValueStore
Author: Jeyhun Karimov <je...@gmail.com>
Reviewers: Matthias J Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>
Closes #3291 from jeyhunkarimov/KAFKA-4659
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4e8797f5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4e8797f5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4e8797f5
Branch: refs/heads/trunk
Commit: 4e8797f54ef9d2d7f40e3100943ae8afd5496b16
Parents: 914e42a
Author: Jeyhun Karimov <je...@gmail.com>
Authored: Thu Jun 22 08:40:54 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Jun 22 08:40:54 2017 +0100
----------------------------------------------------------------------
.../state/internals/CachingKeyValueStore.java | 3 ++
.../internals/CachingKeyValueStoreTest.java | 46 ++++++++++++++++++++
2 files changed, 49 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4e8797f5/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 4f86d4e..6190b88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -184,11 +184,13 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
@Override
public synchronized void put(final K key, final V value) {
+ Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
put(serdes.rawKey(key), value);
}
private synchronized void put(final byte[] rawKey, final V value) {
+ Objects.requireNonNull(rawKey, "key cannot be null");
final byte[] rawValue = serdes.rawValue(value);
cache.put(cacheName, Bytes.wrap(rawKey), new LRUCacheEntry(rawValue, true, context.offset(),
context.timestamp(), context.partition(), context.topic()));
@@ -196,6 +198,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
@Override
public synchronized V putIfAbsent(final K key, final V value) {
+ Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
final byte[] rawKey = serdes.rawKey(key);
final V v = get(rawKey);
http://git-wip-us.apache.org/repos/asf/kafka/blob/4e8797f5/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 8106a10..9110144 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -48,6 +48,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
@@ -243,6 +245,50 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
store.putIfAbsent("b", "c");
}
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnPutWithNullKey() {
+ store.put(null, "c");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionOnPutIfAbsentWithNullKey() {
+ store.putIfAbsent(null, "c");
+ }
+
+ @Test
+ public void shouldThrowNullPointerExceptionOnPutAllWithNullKey() {
+ List<KeyValue<String, String>> entries = new ArrayList<>();
+ entries.add(new KeyValue<String, String>(null, "a"));
+ try {
+ store.putAll(entries);
+ fail("Should have thrown NullPointerException while putAll null key");
+ } catch (NullPointerException e) { }
+ }
+
+ @Test
+ public void shouldPutIfAbsent() {
+ store.putIfAbsent("b", "2");
+ assertTrue(store.get("b").equals("2"));
+
+ store.putIfAbsent("b", "3");
+ assertTrue(store.get("b").equals("2"));
+ }
+
+ @Test
+ public void shouldPutAll() {
+ List<KeyValue<String, String>> entries = new ArrayList<>();
+ entries.add(new KeyValue<>("a", "1"));
+ entries.add(new KeyValue<>("b", "2"));
+ store.putAll(entries);
+ assertEquals(store.get("a"), "1");
+ assertEquals(store.get("b"), "2");
+ }
+
+ @Test
+ public void shouldReturnUnderlying() {
+ assertTrue(store.underlying().equals(underlyingStore));
+ }
+
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowIfTryingToDeleteFromClosedCachingStore() throws Exception {
store.close();