You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/10/12 14:57:47 UTC

kafka git commit: MINOR: improve Store parameter checks

Repository: kafka
Updated Branches:
  refs/heads/trunk 62682d078 -> 53c23bb5e


MINOR: improve Store parameter checks

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@gmail.com>

Closes #4063 from mjsax/minor-improve-store-parameter-checks


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53c23bb5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53c23bb5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53c23bb5

Branch: refs/heads/trunk
Commit: 53c23bb5e65c147d7b2cae0a7fd9b3ba46c8fce5
Parents: 62682d0
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Oct 12 15:55:43 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Oct 12 15:55:43 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/streams/state/Stores.java  | 50 +++++++++++----
 .../apache/kafka/streams/state/StoresTest.java  | 65 ++++++++++++++++++++
 2 files changed, 102 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/53c23bb5/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c9c44af..0ce6d9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Factory for creating state stores in Kafka Streams.
@@ -85,21 +86,23 @@ public class Stores {
 
     /**
      * Create a persistent {@link KeyValueBytesStoreSupplier}.
-     * @param name  name of the store
+     * @param name  name of the store (cannot be {@code null})
      * @return  an instance of a {@link KeyValueBytesStoreSupplier} that can be used
      * to build a persistent store
      */
     public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
+        Objects.requireNonNull(name, "name cannot be null");
         return new RocksDbKeyValueBytesStoreSupplier(name);
     }
 
     /**
      * Create an in-memory {@link KeyValueBytesStoreSupplier}.
-     * @param name  name of the store
+     * @param name  name of the store (cannot be {@code null})
      * @return  an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
      * build an in-memory store
      */
     public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {
+        Objects.requireNonNull(name, "name cannot be null");
         return new KeyValueBytesStoreSupplier() {
             @Override
             public String name() {
@@ -120,12 +123,16 @@ public class Stores {
 
     /**
      * Create a LRU Map {@link KeyValueBytesStoreSupplier}.
-     * @param name          name of the store
-     * @param maxCacheSize  maximum number of items in the LRU
+     * @param name          name of the store (cannot be {@code null})
+     * @param maxCacheSize  maximum number of items in the LRU (cannot be negative)
      * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used to build
      * an LRU Map based store
      */
     public static KeyValueBytesStoreSupplier lruMap(final String name, final int maxCacheSize) {
+        Objects.requireNonNull(name, "name cannot be null");
+        if (maxCacheSize < 0) {
+            throw new IllegalArgumentException("maxCacheSize cannot be negative");
+        }
         return new KeyValueBytesStoreSupplier() {
             @Override
             public String name() {
@@ -146,10 +153,10 @@ public class Stores {
 
     /**
      * Create a persistent {@link WindowBytesStoreSupplier}.
-     * @param name                  name of the store
-     * @param retentionPeriod       length of time to retain data in the store
-     * @param numSegments           number of db segments
-     * @param windowSize            size of the windows
+     * @param name                  name of the store (cannot be {@code null})
+     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
+     * @param numSegments           number of db segments (cannot be zero or negative)
+     * @param windowSize            size of the windows (cannot be negative)
      * @param retainDuplicates      whether or not to retain duplicates.
      * @return an instance of {@link WindowBytesStoreSupplier}
      */
@@ -158,24 +165,38 @@ public class Stores {
                                                                  final int numSegments,
                                                                  final long windowSize,
                                                                  final boolean retainDuplicates) {
+        Objects.requireNonNull(name, "name cannot be null");
+        if (retentionPeriod < 0) {
+            throw new IllegalArgumentException("retentionPeriod cannot be negative");
+        }
+        if (numSegments < 1) {
+            throw new IllegalArgumentException("numSegments cannot must smaller than 1");
+        }
+        if (windowSize < 0) {
+            throw new IllegalArgumentException("windowSize cannot be negative");
+        }
         return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, numSegments, windowSize, retainDuplicates);
     }
 
     /**
      * Create a persistent {@link SessionBytesStoreSupplier}.
-     * @param name              name of the store
-     * @param retentionPeriod   length ot time to retain data in the store
+     * @param name              name of the store (cannot be {@code null})
+     * @param retentionPeriod   length ot time to retain data in the store (cannot be negative)
      * @return an instance of a {@link  SessionBytesStoreSupplier}
      */
     public static SessionBytesStoreSupplier persistentSessionStore(final String name,
                                                                    final long retentionPeriod) {
+        Objects.requireNonNull(name, "name cannot be null");
+        if (retentionPeriod < 0) {
+            throw new IllegalArgumentException("retentionPeriod cannot be negative");
+        }
         return new RocksDbSessionBytesStoreSupplier(name, retentionPeriod);
     }
 
 
     /**
      * Creates a {@link StoreBuilder} that can be used to build a {@link WindowStore}.
-     * @param supplier      a {@link WindowBytesStoreSupplier}
+     * @param supplier      a {@link WindowBytesStoreSupplier} (cannot be {@code null})
      * @param keySerde      the key serde to use
      * @param valueSerde    the value serde to use
      * @param <K>           key type
@@ -185,12 +206,13 @@ public class Stores {
     public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(final WindowBytesStoreSupplier supplier,
                                                                             final Serde<K> keySerde,
                                                                             final Serde<V> valueSerde) {
+        Objects.requireNonNull(supplier, "supplier cannot be null");
         return new WindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
     }
 
     /**
      * Creates a {@link StoreBuilder} than can be used to build a {@link KeyValueStore}.
-     * @param supplier      a {@link KeyValueBytesStoreSupplier}
+     * @param supplier      a {@link KeyValueBytesStoreSupplier} (cannot be {@code null})
      * @param keySerde      the key serde to use
      * @param valueSerde    the value serde to use
      * @param <K>           key type
@@ -200,12 +222,13 @@ public class Stores {
     public static <K, V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                 final Serde<K> keySerde,
                                                                                 final Serde<V> valueSerde) {
+        Objects.requireNonNull(supplier, "supplier cannot be null");
         return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
     }
 
     /**
      * Creates a {@link StoreBuilder} that can be used to build a {@link SessionStore}.
-     * @param supplier      a {@link SessionBytesStoreSupplier}
+     * @param supplier      a {@link SessionBytesStoreSupplier} (cannot be {@code null})
      * @param keySerde      the key serde to use
      * @param valueSerde    the value serde to use
      * @param <K>           key type
@@ -215,6 +238,7 @@ public class Stores {
     public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final SessionBytesStoreSupplier supplier,
                                                                               final Serde<K> keySerde,
                                                                               final Serde<V> valueSerde) {
+        Objects.requireNonNull(supplier, "supplier cannot be null");
         return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/53c23bb5/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 900c8da..665ebc0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -39,6 +39,71 @@ import static org.junit.Assert.fail;
 
 public class StoresTest {
 
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfPersistentKeyValueStoreStoreNameIsNull() {
+        Stores.persistentKeyValueStore(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfIMemoryKeyValueStoreStoreNameIsNull() {
+        Stores.inMemoryKeyValueStore(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfILruMapStoreNameIsNull() {
+        Stores.lruMap(null, 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfILruMapStoreCapacityIsNegative() {
+        Stores.lruMap("anyName", -1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
+        Stores.persistentWindowStore(null, 0, 1, 0, false);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() {
+        Stores.persistentWindowStore("anyName", -1, 1, 0, false);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfIPersistentWindowStoreIfNumberOfSegmentsSmallerThanOne() {
+        Stores.persistentWindowStore("anyName", 0, 0, 0, false);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() {
+        Stores.persistentWindowStore("anyName", 0, 1, -1, false);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfIPersistentSessionStoreStoreNameIsNull() {
+        Stores.persistentSessionStore(null, 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfIPersistentSessionStoreRetentionPeriodIsNegative() {
+        Stores.persistentSessionStore("anyName", -1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfSupplierIsNullForWindowStoreBuilder() {
+        Stores.windowStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfSupplierIsNullForKeyValueStoreBuilder() {
+        Stores.keyValueStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfSupplierIsNullForSessionStoreBuilder() {
+        Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
+    }
+
     @SuppressWarnings("deprecation")
     @Test
     public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() {