You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/06/23 07:59:18 UTC

kafka git commit: KAFKA-4913; prevent creation of window stores with less than 2 segments

Repository: kafka
Updated Branches:
  refs/heads/trunk ac5397964 -> 2420491f4


KAFKA-4913; prevent creation of window stores with less than 2 segments

Throw IllegalArgumentException when attempting to create a `WindowStore` via `Stores` or directly with `RocksDBWindowStoreSupplier` when it has less than 2 segments.

Author: Damian Guy <da...@gmail.com>

Reviewers: Eno Thereska <en...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bb...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #3410 from dguy/kafka-4913


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2420491f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2420491f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2420491f

Branch: refs/heads/trunk
Commit: 2420491f417012ba5215a9f72fa5e3a0c586c8e8
Parents: ac53979
Author: Damian Guy <da...@gmail.com>
Authored: Fri Jun 23 08:59:13 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Fri Jun 23 08:59:13 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/streams/state/Stores.java  |  5 +++-
 .../internals/RocksDBWindowStoreSupplier.java   |  5 +++-
 .../apache/kafka/streams/state/StoresTest.java  | 15 ++++++++++
 .../RocksDBWindowStoreSupplierTest.java         | 29 ++++++++++++--------
 4 files changed, 41 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2420491f/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 86ee1d2..fef4ade 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -102,7 +102,7 @@ public class Stores {
                             @Override
                             public PersistentKeyValueFactory<K, V> persistent() {
                                 return new PersistentKeyValueFactory<K, V>() {
-                                    public boolean cachingEnabled;
+                                    boolean cachingEnabled;
                                     private long windowSize;
                                     private final Map<String, String> logConfig = new HashMap<>();
                                     private int numSegments = 0;
@@ -113,6 +113,9 @@ public class Stores {
 
                                     @Override
                                     public PersistentKeyValueFactory<K, V> windowed(final long windowSize, final long retentionPeriod, final int numSegments, final boolean retainDuplicates) {
+                                        if (numSegments < RocksDBWindowStoreSupplier.MIN_SEGMENTS) {
+                                            throw new IllegalArgumentException("numSegments must be >= " + RocksDBWindowStoreSupplier.MIN_SEGMENTS);
+                                        }
                                         this.windowSize = windowSize;
                                         this.numSegments = numSegments;
                                         this.retentionPeriod = retentionPeriod;

http://git-wip-us.apache.org/repos/asf/kafka/blob/2420491f/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index b1e0b02..e19d09f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -33,7 +33,7 @@ import java.util.Map;
  */
 
 public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> implements WindowStoreSupplier<WindowStore> {
-
+    public static final int MIN_SEGMENTS = 2;
     private final long retentionPeriod;
     private final boolean retainDuplicates;
     private final int numSegments;
@@ -47,6 +47,9 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
 
     public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, Time time, long windowSize, boolean logged, Map<String, String> logConfig, boolean enableCaching) {
         super(name, keySerde, valueSerde, time, logged, logConfig);
+        if (numSegments < MIN_SEGMENTS) {
+            throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
+        }
         this.retentionPeriod = retentionPeriod;
         this.retainDuplicates = retainDuplicates;
         this.numSegments = numSegments;

http://git-wip-us.apache.org/repos/asf/kafka/blob/2420491f/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index c2aa14d..66adbf5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class StoresTest {
 
@@ -80,4 +81,18 @@ public class StoresTest {
 
         assertFalse(supplier.loggingEnabled());
     }
+
+    @Test
+    public void shouldThrowIllegalArgumentExceptionWhenTryingToConstructWindowStoreWithLessThanTwoSegments() throws Exception {
+        final Stores.PersistentKeyValueFactory<String, String> storeFactory = Stores.create("store")
+                .withKeys(Serdes.String())
+                .withValues(Serdes.String())
+                .persistent();
+        try {
+            storeFactory.windowed(1, 1, 1, false);
+            fail("Should have thrown illegal argument exception as number of segments is less than 2");
+        } catch (final IllegalArgumentException e) {
+         // ok
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2420491f/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
index c9301a1..77fe8ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
@@ -53,12 +53,14 @@ public class RocksDBWindowStoreSupplierTest {
     @After
     public void close() {
         context.close();
-        store.close();
+        if (store != null) {
+            store.close();
+        }
     }
 
     @Test
     public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() throws Exception {
-        store = createStore(true, false);
+        store = createStore(true, false, 3);
         final List<ProducerRecord> logged = new ArrayList<>();
         final NoOpRecordCollector collector = new NoOpRecordCollector() {
             @Override
@@ -85,7 +87,7 @@ public class RocksDBWindowStoreSupplierTest {
 
     @Test
     public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() throws Exception {
-        store = createStore(false, false);
+        store = createStore(false, false, 3);
         final List<ProducerRecord> logged = new ArrayList<>();
         final NoOpRecordCollector collector = new NoOpRecordCollector() {
             @Override
@@ -112,7 +114,7 @@ public class RocksDBWindowStoreSupplierTest {
 
     @Test
     public void shouldBeCachedWindowStoreWhenCachingEnabled() throws Exception {
-        store = createStore(false, true);
+        store = createStore(false, true, 3);
         store.init(context, store);
         context.setTime(1);
         store.put("a", "b");
@@ -123,20 +125,20 @@ public class RocksDBWindowStoreSupplierTest {
 
     @Test
     public void shouldReturnRocksDbStoreWhenCachingAndLoggingDisabled() throws Exception {
-        store = createStore(false, false);
+        store = createStore(false, false, 3);
         assertThat(store, is(instanceOf(RocksDBWindowStore.class)));
     }
 
     @Test
     public void shouldReturnRocksDbStoreWhenCachingDisabled() throws Exception {
-        store = createStore(true, false);
+        store = createStore(true, false, 3);
         assertThat(store, is(instanceOf(RocksDBWindowStore.class)));
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenCached() throws Exception {
-        store = createStore(false, true);
+        store = createStore(false, true, 3);
         store.init(context, store);
         final StreamsMetrics metrics = context.metrics();
         assertFalse(metrics.metrics().isEmpty());
@@ -145,7 +147,7 @@ public class RocksDBWindowStoreSupplierTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenLogged() throws Exception {
-        store = createStore(true, false);
+        store = createStore(true, false, 3);
         store.init(context, store);
         final StreamsMetrics metrics = context.metrics();
         assertFalse(metrics.metrics().isEmpty());
@@ -154,17 +156,22 @@ public class RocksDBWindowStoreSupplierTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception {
-        store = createStore(false, false);
+        store = createStore(false, false, 3);
         store.init(context, store);
         final StreamsMetrics metrics = context.metrics();
         assertFalse(metrics.metrics().isEmpty());
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIllegalArgumentExceptionIfNumSegmentsLessThanTwo() throws Exception {
+        createStore(true, true, 1);
+    }
+
     @SuppressWarnings("unchecked")
-    private WindowStore<String, String> createStore(final boolean logged, final boolean cached) {
+    private WindowStore<String, String> createStore(final boolean logged, final boolean cached, final int numSegments) {
         return new RocksDBWindowStoreSupplier<>(STORE_NAME,
                                                 10,
-                                                3,
+                                                numSegments,
                                                 false,
                                                 Serdes.String(),
                                                 Serdes.String(),