You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/10/12 14:57:47 UTC
kafka git commit: MINOR: improve Store parameter checks
Repository: kafka
Updated Branches:
refs/heads/trunk 62682d078 -> 53c23bb5e
MINOR: improve Store parameter checks
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@gmail.com>
Closes #4063 from mjsax/minor-improve-store-parameter-checks
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53c23bb5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53c23bb5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53c23bb5
Branch: refs/heads/trunk
Commit: 53c23bb5e65c147d7b2cae0a7fd9b3ba46c8fce5
Parents: 62682d0
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Oct 12 15:55:43 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Oct 12 15:55:43 2017 +0100
----------------------------------------------------------------------
.../org/apache/kafka/streams/state/Stores.java | 50 +++++++++++----
.../apache/kafka/streams/state/StoresTest.java | 65 ++++++++++++++++++++
2 files changed, 102 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/53c23bb5/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c9c44af..0ce6d9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/**
* Factory for creating state stores in Kafka Streams.
@@ -85,21 +86,23 @@ public class Stores {
/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
- * @param name name of the store
+ * @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
* to build a persistent store
*/
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
+ Objects.requireNonNull(name, "name cannot be null");
return new RocksDbKeyValueBytesStoreSupplier(name);
}
/**
* Create an in-memory {@link KeyValueBytesStoreSupplier}.
- * @param name name of the store
+ * @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
* build an in-memory store
*/
public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {
+ Objects.requireNonNull(name, "name cannot be null");
return new KeyValueBytesStoreSupplier() {
@Override
public String name() {
@@ -120,12 +123,16 @@ public class Stores {
/**
* Create a LRU Map {@link KeyValueBytesStoreSupplier}.
- * @param name name of the store
- * @param maxCacheSize maximum number of items in the LRU
+ * @param name name of the store (cannot be {@code null})
+ * @param maxCacheSize maximum number of items in the LRU (cannot be negative)
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used to build
* an LRU Map based store
*/
public static KeyValueBytesStoreSupplier lruMap(final String name, final int maxCacheSize) {
+ Objects.requireNonNull(name, "name cannot be null");
+ if (maxCacheSize < 0) {
+ throw new IllegalArgumentException("maxCacheSize cannot be negative");
+ }
return new KeyValueBytesStoreSupplier() {
@Override
public String name() {
@@ -146,10 +153,10 @@ public class Stores {
/**
* Create a persistent {@link WindowBytesStoreSupplier}.
- * @param name name of the store
- * @param retentionPeriod length of time to retain data in the store
- * @param numSegments number of db segments
- * @param windowSize size of the windows
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod length of time to retain data in the store (cannot be negative)
+ * @param numSegments number of db segments (cannot be zero or negative)
+ * @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
*/
@@ -158,24 +165,38 @@ public class Stores {
final int numSegments,
final long windowSize,
final boolean retainDuplicates) {
+ Objects.requireNonNull(name, "name cannot be null");
+ if (retentionPeriod < 0) {
+ throw new IllegalArgumentException("retentionPeriod cannot be negative");
+ }
+ if (numSegments < 1) {
+ throw new IllegalArgumentException("numSegments cannot must smaller than 1");
+ }
+ if (windowSize < 0) {
+ throw new IllegalArgumentException("windowSize cannot be negative");
+ }
return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, numSegments, windowSize, retainDuplicates);
}
/**
* Create a persistent {@link SessionBytesStoreSupplier}.
- * @param name name of the store
- * @param retentionPeriod length ot time to retain data in the store
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod length ot time to retain data in the store (cannot be negative)
* @return an instance of a {@link SessionBytesStoreSupplier}
*/
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
final long retentionPeriod) {
+ Objects.requireNonNull(name, "name cannot be null");
+ if (retentionPeriod < 0) {
+ throw new IllegalArgumentException("retentionPeriod cannot be negative");
+ }
return new RocksDbSessionBytesStoreSupplier(name, retentionPeriod);
}
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link WindowStore}.
- * @param supplier a {@link WindowBytesStoreSupplier}
+ * @param supplier a {@link WindowBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
* @param valueSerde the value serde to use
* @param <K> key type
@@ -185,12 +206,13 @@ public class Stores {
public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(final WindowBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
+ Objects.requireNonNull(supplier, "supplier cannot be null");
return new WindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
/**
* Creates a {@link StoreBuilder} than can be used to build a {@link KeyValueStore}.
- * @param supplier a {@link KeyValueBytesStoreSupplier}
+ * @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
* @param valueSerde the value serde to use
* @param <K> key type
@@ -200,12 +222,13 @@ public class Stores {
public static <K, V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
+ Objects.requireNonNull(supplier, "supplier cannot be null");
return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link SessionStore}.
- * @param supplier a {@link SessionBytesStoreSupplier}
+ * @param supplier a {@link SessionBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
* @param valueSerde the value serde to use
* @param <K> key type
@@ -215,6 +238,7 @@ public class Stores {
public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final SessionBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
+ Objects.requireNonNull(supplier, "supplier cannot be null");
return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/53c23bb5/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 900c8da..665ebc0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -39,6 +39,71 @@ import static org.junit.Assert.fail;
public class StoresTest {
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowIfPersistentKeyValueStoreStoreNameIsNull() {
+ Stores.persistentKeyValueStore(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowIfIMemoryKeyValueStoreStoreNameIsNull() {
+ Stores.inMemoryKeyValueStore(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowIfILruMapStoreNameIsNull() {
+ Stores.lruMap(null, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowIfILruMapStoreCapacityIsNegative() {
+ Stores.lruMap("anyName", -1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
+ Stores.persistentWindowStore(null, 0, 1, 0, false);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() {
+ Stores.persistentWindowStore("anyName", -1, 1, 0, false);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowIfIPersistentWindowStoreIfNumberOfSegmentsSmallerThanOne() {
+ Stores.persistentWindowStore("anyName", 0, 0, 0, false);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() {
+ Stores.persistentWindowStore("anyName", 0, 1, -1, false);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowIfIPersistentSessionStoreStoreNameIsNull() {
+ Stores.persistentSessionStore(null, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowIfIPersistentSessionStoreRetentionPeriodIsNegative() {
+ Stores.persistentSessionStore("anyName", -1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowIfSupplierIsNullForWindowStoreBuilder() {
+ Stores.windowStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowIfSupplierIsNullForKeyValueStoreBuilder() {
+ Stores.keyValueStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowIfSupplierIsNullForSessionStoreBuilder() {
+ Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
+ }
+
@SuppressWarnings("deprecation")
@Test
public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() {