You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/13 19:05:51 UTC
kafka git commit: KAFKA-4657: Improve test coverage of
CompositeReadOnlyWindowStore
Repository: kafka
Updated Branches:
refs/heads/trunk cd69daa41 -> 85e7a0a2b
KAFKA-4657: Improve test coverage of CompositeReadOnlyWindowStore
This commmit brings improved test coverage for window store fetch method
and WindowStoreIterator
Author: Andrey Dyachkov <an...@zalando.de>
Reviewers: Damian Guy, Guozhang Wang
Closes #2672 from adyach/trunk
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/85e7a0a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/85e7a0a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/85e7a0a2
Branch: refs/heads/trunk
Commit: 85e7a0a2b0d56891c13c15fc88a3011877f9c9e9
Parents: cd69daa
Author: Andrey Dyachkov <an...@zalando.de>
Authored: Mon Mar 13 12:05:47 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Mar 13 12:05:47 2017 -0700
----------------------------------------------------------------------
.../CompositeReadOnlyWindowStoreTest.java | 49 ++++++++++++++++++--
1 file changed, 46 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/85e7a0a2/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index d0c82e4..6f4ff07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -22,12 +22,16 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.StateStoreProviderStub;
import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.NoSuchElementException;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
@@ -43,6 +47,9 @@ public class CompositeReadOnlyWindowStoreTest {
private ReadOnlyWindowStoreStub<String, String>
otherUnderlyingStore;
+ @Rule
+ public final ExpectedException windowStoreIteratorException = ExpectedException.none();
+
@Before
public void before() {
stubProviderOne = new StateStoreProviderStub(false);
@@ -106,17 +113,53 @@ public class CompositeReadOnlyWindowStoreTest {
assertEquals(Collections.singletonList(new KeyValue<>(1L, "my-value")), results);
}
-
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStateStoreExceptionOnRebalance() throws Exception {
final CompositeReadOnlyWindowStore<Object, Object> store = new CompositeReadOnlyWindowStore<>(new StateStoreProviderStub(true), QueryableStoreTypes.windowStore(), "foo");
store.fetch("key", 1, 10);
}
- @Test(expected = InvalidStateStoreException.class)
+ @Test
public void shouldThrowInvalidStateStoreExceptionIfFetchThrows() throws Exception {
underlyingWindowStore.setOpen(false);
- underlyingWindowStore.fetch("key", 1, 10);
+ final CompositeReadOnlyWindowStore<Object, Object> store =
+ new CompositeReadOnlyWindowStore<>(stubProviderOne, QueryableStoreTypes.windowStore(), "window-store");
+ try {
+ store.fetch("key", 1, 10);
+ Assert.fail("InvalidStateStoreException was expected");
+ } catch (InvalidStateStoreException e) {
+ Assert.assertEquals("State store is not available anymore and may have been migrated to another instance; " +
+ "please re-discover its location from the state metadata.", e.getMessage());
+ }
+ }
+
+ @Test
+ public void emptyIteratorAlwaysReturnsFalse() throws Exception {
+ final CompositeReadOnlyWindowStore<Object, Object> store = new CompositeReadOnlyWindowStore<>(new
+ StateStoreProviderStub(false), QueryableStoreTypes.windowStore(), "foo");
+ final WindowStoreIterator<Object> windowStoreIterator = store.fetch("key", 1, 10);
+
+ Assert.assertFalse(windowStoreIterator.hasNext());
+ }
+
+ @Test
+ public void emptyIteratorPeekNextKeyShouldThrowNoSuchElementException() throws Exception {
+ final CompositeReadOnlyWindowStore<Object, Object> store = new CompositeReadOnlyWindowStore<>(new
+ StateStoreProviderStub(false), QueryableStoreTypes.windowStore(), "foo");
+ final WindowStoreIterator<Object> windowStoreIterator = store.fetch("key", 1, 10);
+
+ windowStoreIteratorException.expect(NoSuchElementException.class);
+ windowStoreIterator.peekNextKey();
+ }
+
+ @Test
+ public void emptyIteratorNextShouldThrowNoSuchElementException() throws Exception {
+ final CompositeReadOnlyWindowStore<Object, Object> store = new CompositeReadOnlyWindowStore<>(new
+ StateStoreProviderStub(false), QueryableStoreTypes.windowStore(), "foo");
+ final WindowStoreIterator<Object> windowStoreIterator = store.fetch("key", 1, 10);
+
+ windowStoreIteratorException.expect(NoSuchElementException.class);
+ windowStoreIterator.next();
}
}
\ No newline at end of file