You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/13 19:05:51 UTC

kafka git commit: KAFKA-4657: Improve test coverage of CompositeReadOnlyWindowStore

Repository: kafka
Updated Branches:
  refs/heads/trunk cd69daa41 -> 85e7a0a2b


KAFKA-4657: Improve test coverage of CompositeReadOnlyWindowStore

This commmit brings improved test coverage for window store fetch method
and WindowStoreIterator

Author: Andrey Dyachkov <an...@zalando.de>

Reviewers: Damian Guy, Guozhang Wang

Closes #2672 from adyach/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/85e7a0a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/85e7a0a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/85e7a0a2

Branch: refs/heads/trunk
Commit: 85e7a0a2b0d56891c13c15fc88a3011877f9c9e9
Parents: cd69daa
Author: Andrey Dyachkov <an...@zalando.de>
Authored: Mon Mar 13 12:05:47 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Mar 13 12:05:47 2017 -0700

----------------------------------------------------------------------
 .../CompositeReadOnlyWindowStoreTest.java       | 49 ++++++++++++++++++--
 1 file changed, 46 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/85e7a0a2/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index d0c82e4..6f4ff07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -22,12 +22,16 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.StateStoreProviderStub;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
@@ -43,6 +47,9 @@ public class CompositeReadOnlyWindowStoreTest {
     private ReadOnlyWindowStoreStub<String, String>
             otherUnderlyingStore;
 
+    @Rule
+    public final ExpectedException windowStoreIteratorException = ExpectedException.none();
+
     @Before
     public void before() {
         stubProviderOne = new StateStoreProviderStub(false);
@@ -106,17 +113,53 @@ public class CompositeReadOnlyWindowStoreTest {
         assertEquals(Collections.singletonList(new KeyValue<>(1L, "my-value")), results);
     }
 
-
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowInvalidStateStoreExceptionOnRebalance() throws Exception {
         final CompositeReadOnlyWindowStore<Object, Object> store = new CompositeReadOnlyWindowStore<>(new StateStoreProviderStub(true), QueryableStoreTypes.windowStore(), "foo");
         store.fetch("key", 1, 10);
     }
 
-    @Test(expected = InvalidStateStoreException.class)
+    @Test
     public void shouldThrowInvalidStateStoreExceptionIfFetchThrows() throws Exception {
         underlyingWindowStore.setOpen(false);
-        underlyingWindowStore.fetch("key", 1, 10);
+        final CompositeReadOnlyWindowStore<Object, Object> store =
+                new CompositeReadOnlyWindowStore<>(stubProviderOne, QueryableStoreTypes.windowStore(), "window-store");
+        try {
+            store.fetch("key", 1, 10);
+            Assert.fail("InvalidStateStoreException was expected");
+        } catch (InvalidStateStoreException e) {
+            Assert.assertEquals("State store is not available anymore and may have been migrated to another instance; " +
+                    "please re-discover its location from the state metadata.", e.getMessage());
+        }
+    }
+
+    @Test
+    public void emptyIteratorAlwaysReturnsFalse() throws Exception {
+        final CompositeReadOnlyWindowStore<Object, Object> store = new CompositeReadOnlyWindowStore<>(new
+                StateStoreProviderStub(false), QueryableStoreTypes.windowStore(), "foo");
+        final WindowStoreIterator<Object> windowStoreIterator = store.fetch("key", 1, 10);
+
+        Assert.assertFalse(windowStoreIterator.hasNext());
+    }
+
+    @Test
+    public void emptyIteratorPeekNextKeyShouldThrowNoSuchElementException() throws Exception {
+        final CompositeReadOnlyWindowStore<Object, Object> store = new CompositeReadOnlyWindowStore<>(new
+                StateStoreProviderStub(false), QueryableStoreTypes.windowStore(), "foo");
+        final WindowStoreIterator<Object> windowStoreIterator = store.fetch("key", 1, 10);
+
+        windowStoreIteratorException.expect(NoSuchElementException.class);
+        windowStoreIterator.peekNextKey();
+    }
+
+    @Test
+    public void emptyIteratorNextShouldThrowNoSuchElementException() throws Exception {
+        final CompositeReadOnlyWindowStore<Object, Object> store = new CompositeReadOnlyWindowStore<>(new
+                StateStoreProviderStub(false), QueryableStoreTypes.windowStore(), "foo");
+        final WindowStoreIterator<Object> windowStoreIterator = store.fetch("key", 1, 10);
+
+        windowStoreIteratorException.expect(NoSuchElementException.class);
+        windowStoreIterator.next();
     }
 
 }
\ No newline at end of file