You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/06/23 07:59:18 UTC
kafka git commit: KAFKA-4913;
prevent creation of window stores with less than 2 segments
Repository: kafka
Updated Branches:
refs/heads/trunk ac5397964 -> 2420491f4
KAFKA-4913; prevent creation of window stores with less than 2 segments
Throw IllegalArgumentException when attempting to create a `WindowStore` via `Stores` or directly with `RocksDBWindowStoreSupplier` when it has less than 2 segments.
Author: Damian Guy <da...@gmail.com>
Reviewers: Eno Thereska <en...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bb...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #3410 from dguy/kafka-4913
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2420491f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2420491f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2420491f
Branch: refs/heads/trunk
Commit: 2420491f417012ba5215a9f72fa5e3a0c586c8e8
Parents: ac53979
Author: Damian Guy <da...@gmail.com>
Authored: Fri Jun 23 08:59:13 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Fri Jun 23 08:59:13 2017 +0100
----------------------------------------------------------------------
.../org/apache/kafka/streams/state/Stores.java | 5 +++-
.../internals/RocksDBWindowStoreSupplier.java | 5 +++-
.../apache/kafka/streams/state/StoresTest.java | 15 ++++++++++
.../RocksDBWindowStoreSupplierTest.java | 29 ++++++++++++--------
4 files changed, 41 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2420491f/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 86ee1d2..fef4ade 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -102,7 +102,7 @@ public class Stores {
@Override
public PersistentKeyValueFactory<K, V> persistent() {
return new PersistentKeyValueFactory<K, V>() {
- public boolean cachingEnabled;
+ boolean cachingEnabled;
private long windowSize;
private final Map<String, String> logConfig = new HashMap<>();
private int numSegments = 0;
@@ -113,6 +113,9 @@ public class Stores {
@Override
public PersistentKeyValueFactory<K, V> windowed(final long windowSize, final long retentionPeriod, final int numSegments, final boolean retainDuplicates) {
+ if (numSegments < RocksDBWindowStoreSupplier.MIN_SEGMENTS) {
+ throw new IllegalArgumentException("numSegments must be >= " + RocksDBWindowStoreSupplier.MIN_SEGMENTS);
+ }
this.windowSize = windowSize;
this.numSegments = numSegments;
this.retentionPeriod = retentionPeriod;
http://git-wip-us.apache.org/repos/asf/kafka/blob/2420491f/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index b1e0b02..e19d09f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -33,7 +33,7 @@ import java.util.Map;
*/
public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> implements WindowStoreSupplier<WindowStore> {
-
+ public static final int MIN_SEGMENTS = 2;
private final long retentionPeriod;
private final boolean retainDuplicates;
private final int numSegments;
@@ -47,6 +47,9 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, Time time, long windowSize, boolean logged, Map<String, String> logConfig, boolean enableCaching) {
super(name, keySerde, valueSerde, time, logged, logConfig);
+ if (numSegments < MIN_SEGMENTS) {
+ throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
+ }
this.retentionPeriod = retentionPeriod;
this.retainDuplicates = retainDuplicates;
this.numSegments = numSegments;
http://git-wip-us.apache.org/repos/asf/kafka/blob/2420491f/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index c2aa14d..66adbf5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class StoresTest {
@@ -80,4 +81,18 @@ public class StoresTest {
assertFalse(supplier.loggingEnabled());
}
+
+ @Test
+ public void shouldThrowIllegalArgumentExceptionWhenTryingToConstructWindowStoreWithLessThanTwoSegments() throws Exception {
+ final Stores.PersistentKeyValueFactory<String, String> storeFactory = Stores.create("store")
+ .withKeys(Serdes.String())
+ .withValues(Serdes.String())
+ .persistent();
+ try {
+ storeFactory.windowed(1, 1, 1, false);
+ fail("Should have thrown illegal argument exception as number of segments is less than 2");
+ } catch (final IllegalArgumentException e) {
+ // ok
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/2420491f/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
index c9301a1..77fe8ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
@@ -53,12 +53,14 @@ public class RocksDBWindowStoreSupplierTest {
@After
public void close() {
context.close();
- store.close();
+ if (store != null) {
+ store.close();
+ }
}
@Test
public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() throws Exception {
- store = createStore(true, false);
+ store = createStore(true, false, 3);
final List<ProducerRecord> logged = new ArrayList<>();
final NoOpRecordCollector collector = new NoOpRecordCollector() {
@Override
@@ -85,7 +87,7 @@ public class RocksDBWindowStoreSupplierTest {
@Test
public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() throws Exception {
- store = createStore(false, false);
+ store = createStore(false, false, 3);
final List<ProducerRecord> logged = new ArrayList<>();
final NoOpRecordCollector collector = new NoOpRecordCollector() {
@Override
@@ -112,7 +114,7 @@ public class RocksDBWindowStoreSupplierTest {
@Test
public void shouldBeCachedWindowStoreWhenCachingEnabled() throws Exception {
- store = createStore(false, true);
+ store = createStore(false, true, 3);
store.init(context, store);
context.setTime(1);
store.put("a", "b");
@@ -123,20 +125,20 @@ public class RocksDBWindowStoreSupplierTest {
@Test
public void shouldReturnRocksDbStoreWhenCachingAndLoggingDisabled() throws Exception {
- store = createStore(false, false);
+ store = createStore(false, false, 3);
assertThat(store, is(instanceOf(RocksDBWindowStore.class)));
}
@Test
public void shouldReturnRocksDbStoreWhenCachingDisabled() throws Exception {
- store = createStore(true, false);
+ store = createStore(true, false, 3);
assertThat(store, is(instanceOf(RocksDBWindowStore.class)));
}
@SuppressWarnings("unchecked")
@Test
public void shouldHaveMeteredStoreWhenCached() throws Exception {
- store = createStore(false, true);
+ store = createStore(false, true, 3);
store.init(context, store);
final StreamsMetrics metrics = context.metrics();
assertFalse(metrics.metrics().isEmpty());
@@ -145,7 +147,7 @@ public class RocksDBWindowStoreSupplierTest {
@SuppressWarnings("unchecked")
@Test
public void shouldHaveMeteredStoreWhenLogged() throws Exception {
- store = createStore(true, false);
+ store = createStore(true, false, 3);
store.init(context, store);
final StreamsMetrics metrics = context.metrics();
assertFalse(metrics.metrics().isEmpty());
@@ -154,17 +156,22 @@ public class RocksDBWindowStoreSupplierTest {
@SuppressWarnings("unchecked")
@Test
public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception {
- store = createStore(false, false);
+ store = createStore(false, false, 3);
store.init(context, store);
final StreamsMetrics metrics = context.metrics();
assertFalse(metrics.metrics().isEmpty());
}
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowIllegalArgumentExceptionIfNumSegmentsLessThanTwo() throws Exception {
+ createStore(true, true, 1);
+ }
+
@SuppressWarnings("unchecked")
- private WindowStore<String, String> createStore(final boolean logged, final boolean cached) {
+ private WindowStore<String, String> createStore(final boolean logged, final boolean cached, final int numSegments) {
return new RocksDBWindowStoreSupplier<>(STORE_NAME,
10,
- 3,
+ numSegments,
false,
Serdes.String(),
Serdes.String(),