You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/06/22 07:40:59 UTC

kafka git commit: KAFKA-4659; Improve test coverage of CachingKeyValueStore

Repository: kafka
Updated Branches:
  refs/heads/trunk 914e42a28 -> 4e8797f54


KAFKA-4659; Improve test coverage of CachingKeyValueStore

Author: Jeyhun Karimov <je...@gmail.com>

Reviewers: Matthias J Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>

Closes #3291 from jeyhunkarimov/KAFKA-4659


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4e8797f5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4e8797f5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4e8797f5

Branch: refs/heads/trunk
Commit: 4e8797f54ef9d2d7f40e3100943ae8afd5496b16
Parents: 914e42a
Author: Jeyhun Karimov <je...@gmail.com>
Authored: Thu Jun 22 08:40:54 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Jun 22 08:40:54 2017 +0100

----------------------------------------------------------------------
 .../state/internals/CachingKeyValueStore.java   |  3 ++
 .../internals/CachingKeyValueStoreTest.java     | 46 ++++++++++++++++++++
 2 files changed, 49 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4e8797f5/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 4f86d4e..6190b88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -184,11 +184,13 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
 
     @Override
     public synchronized void put(final K key, final V value) {
+        Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
         put(serdes.rawKey(key), value);
     }
 
     private synchronized void put(final byte[] rawKey, final V value) {
+        Objects.requireNonNull(rawKey, "key cannot be null");
         final byte[] rawValue = serdes.rawValue(value);
         cache.put(cacheName, Bytes.wrap(rawKey), new LRUCacheEntry(rawValue, true, context.offset(),
                   context.timestamp(), context.partition(), context.topic()));
@@ -196,6 +198,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
 
     @Override
     public synchronized V putIfAbsent(final K key, final V value) {
+        Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
         final byte[] rawKey = serdes.rawKey(key);
         final V v = get(rawKey);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e8797f5/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 8106a10..9110144 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -48,6 +48,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
 
 public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
@@ -243,6 +245,50 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         store.putIfAbsent("b", "c");
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnPutWithNullKey() {
+        store.put(null, "c");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionOnPutIfAbsentWithNullKey() {
+        store.putIfAbsent(null, "c");
+    }
+
+    @Test
+    public void shouldThrowNullPointerExceptionOnPutAllWithNullKey() {
+        List<KeyValue<String, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<String, String>(null, "a"));
+        try {
+            store.putAll(entries);
+            fail("Should have thrown NullPointerException while putAll null key");
+        } catch (NullPointerException e) { }
+    }
+
+    @Test
+    public void shouldPutIfAbsent() {
+        store.putIfAbsent("b", "2");
+        assertTrue(store.get("b").equals("2"));
+
+        store.putIfAbsent("b", "3");
+        assertTrue(store.get("b").equals("2"));
+    }
+
+    @Test
+    public void shouldPutAll() {
+        List<KeyValue<String, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>("a", "1"));
+        entries.add(new KeyValue<>("b", "2"));
+        store.putAll(entries);
+        assertEquals(store.get("a"), "1");
+        assertEquals(store.get("b"), "2");
+    }
+
+    @Test
+    public void shouldReturnUnderlying() {
+        assertTrue(store.underlying().equals(underlyingStore));
+    }
+
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowIfTryingToDeleteFromClosedCachingStore() throws Exception {
         store.close();