You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/17 22:50:57 UTC
[kafka] branch trunk updated: KAFKA-3522: TopologyTestDriver should
only return custom stores via untyped getStateStore() method (#6756)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e018bfe KAFKA-3522: TopologyTestDriver should only return custom stores via untyped getStateStore() method (#6756)
e018bfe is described below
commit e018bfe2b050b55b479bbabb9fc26ebcc211b177
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Sat May 18 00:50:45 2019 +0200
KAFKA-3522: TopologyTestDriver should only return custom stores via untyped getStateStore() method (#6756)
Reviewers: Sophie Blee-Goldman <so...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
.../apache/kafka/streams/TopologyTestDriver.java | 61 +++++-
.../kafka/streams/TopologyTestDriverTest.java | 213 ++++++++++++++++-----
2 files changed, 221 insertions(+), 53 deletions(-)
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index bceead2..23dbf30 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -63,6 +63,9 @@ import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -571,6 +574,10 @@ public class TopologyTestDriver implements Closeable {
* {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
* <p>
* Note, that {@code StateStore} might be {@code null} if a store is added but not connected to any processor.
+ * <p>
+ * <strong>Caution:</strong> Using this method to access stores that are added by the DSL is unsafe as the store
+ * types may change. Stores added by the DSL should only be accessed via the corresponding typed methods
+ * like {@link #getKeyValueStore(String)} etc.
*
* @return all stores my name
* @see #getStateStore(String)
@@ -584,7 +591,7 @@ public class TopologyTestDriver implements Closeable {
public Map<String, StateStore> getAllStateStores() {
final Map<String, StateStore> allStores = new HashMap<>();
for (final String storeName : internalTopologyBuilder.allStateStoreName()) {
- allStores.put(storeName, getStateStore(storeName));
+ allStores.put(storeName, getStateStore(storeName, false));
}
return allStores;
}
@@ -593,11 +600,17 @@ public class TopologyTestDriver implements Closeable {
* Get the {@link StateStore} with the given name.
* The store can be a "regular" or global store.
* <p>
+ * Should be used for custom stores only.
+ * For built-in stores, the corresponding typed methods like {@link #getKeyValueStore(String)} should be used.
+ * <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
*
* @param name the name of the store
* @return the state store, or {@code null} if no store has been registered with the given name
+ * @throws IllegalArgumentException if the store is a built-in store like {@link KeyValueStore},
+ * {@link WindowStore}, or {@link SessionStore}
+ *
* @see #getAllStateStores()
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
@@ -606,10 +619,18 @@ public class TopologyTestDriver implements Closeable {
* @see #getSessionStore(String)
*/
@SuppressWarnings("WeakerAccess")
- public StateStore getStateStore(final String name) {
+ public StateStore getStateStore(final String name) throws IllegalArgumentException {
+ return getStateStore(name, true);
+ }
+
+ private StateStore getStateStore(final String name,
+ final boolean throwForBuiltInStores) {
if (task != null) {
final StateStore stateStore = ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
if (stateStore != null) {
+ if (throwForBuiltInStores) {
+ throwIfBuiltInStore(stateStore);
+ }
return stateStore;
}
}
@@ -617,6 +638,9 @@ public class TopologyTestDriver implements Closeable {
if (globalStateManager != null) {
final StateStore stateStore = globalStateManager.getGlobalStore(name);
if (stateStore != null) {
+ if (throwForBuiltInStores) {
+ throwIfBuiltInStore(stateStore);
+ }
return stateStore;
}
@@ -625,6 +649,29 @@ public class TopologyTestDriver implements Closeable {
return null;
}
+ private void throwIfBuiltInStore(final StateStore stateStore) {
+ if (stateStore instanceof TimestampedKeyValueStore) {
+ throw new IllegalArgumentException("Store " + stateStore.name()
+ + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`");
+ }
+ if (stateStore instanceof ReadOnlyKeyValueStore) {
+ throw new IllegalArgumentException("Store " + stateStore.name()
+ + " is a key-value store and should be accessed via `getKeyValueStore()`");
+ }
+ if (stateStore instanceof TimestampedWindowStore) {
+ throw new IllegalArgumentException("Store " + stateStore.name()
+ + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`");
+ }
+ if (stateStore instanceof ReadOnlyWindowStore) {
+ throw new IllegalArgumentException("Store " + stateStore.name()
+ + " is a window store and should be accessed via `getWindowStore()`");
+ }
+ if (stateStore instanceof ReadOnlySessionStore) {
+ throw new IllegalArgumentException("Store " + stateStore.name()
+ + " is a session store and should be accessed via `getSessionStore()`");
+ }
+ }
+
/**
* Get the {@link KeyValueStore} or {@link TimestampedKeyValueStore} with the given name.
* The store can be a "regular" or global store.
@@ -648,7 +695,7 @@ public class TopologyTestDriver implements Closeable {
*/
@SuppressWarnings({"unchecked", "WeakerAccess"})
public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
- final StateStore store = getStateStore(name);
+ final StateStore store = getStateStore(name, false);
if (store instanceof TimestampedKeyValueStore) {
log.info("Method #getTimestampedKeyValueStore() should be used to access a TimestampedKeyValueStore.");
return new KeyValueStoreFacade<>((TimestampedKeyValueStore<K, V>) store);
@@ -674,7 +721,7 @@ public class TopologyTestDriver implements Closeable {
*/
@SuppressWarnings({"unchecked", "WeakerAccess"})
public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(final String name) {
- final StateStore store = getStateStore(name);
+ final StateStore store = getStateStore(name, false);
return store instanceof TimestampedKeyValueStore ? (TimestampedKeyValueStore<K, V>) store : null;
}
@@ -701,7 +748,7 @@ public class TopologyTestDriver implements Closeable {
*/
@SuppressWarnings({"unchecked", "WeakerAccess"})
public <K, V> WindowStore<K, V> getWindowStore(final String name) {
- final StateStore store = getStateStore(name);
+ final StateStore store = getStateStore(name, false);
if (store instanceof TimestampedWindowStore) {
log.info("Method #getTimestampedWindowStore() should be used to access a TimestampedWindowStore.");
return new WindowStoreFacade<>((TimestampedWindowStore<K, V>) store);
@@ -727,7 +774,7 @@ public class TopologyTestDriver implements Closeable {
*/
@SuppressWarnings({"unchecked", "WeakerAccess"})
public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(final String name) {
- final StateStore store = getStateStore(name);
+ final StateStore store = getStateStore(name, false);
return store instanceof TimestampedWindowStore ? (TimestampedWindowStore<K, V>) store : null;
}
@@ -749,7 +796,7 @@ public class TopologyTestDriver implements Closeable {
*/
@SuppressWarnings({"unchecked", "WeakerAccess"})
public <K, V> SessionStore<K, V> getSessionStore(final String name) {
- final StateStore store = getStateStore(name);
+ final StateStore store = getStateStore(name, false);
return store instanceof SessionStore ? (SessionStore<K, V>) store : null;
}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 2394203..84fae99 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -78,6 +78,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -749,7 +750,173 @@ public class TopologyTestDriverTest {
final String globalTimestampedKeyValueStoreName = "globalKeyValueTimestampStore";
final Topology topology = setupSingleProcessorTopology();
+ addStoresToTopology(
+ topology,
+ persistent,
+ keyValueStoreName,
+ timestampedKeyValueStoreName,
+ windowStoreName,
+ timestampedWindowStoreName,
+ sessionStoreName,
+ globalKeyValueStoreName,
+ globalTimestampedKeyValueStoreName);
+
+ testDriver = new TopologyTestDriver(topology, config);
+
+ // verify state stores
+ assertNotNull(testDriver.getKeyValueStore(keyValueStoreName));
+ assertNull(testDriver.getTimestampedKeyValueStore(keyValueStoreName));
+ assertNull(testDriver.getWindowStore(keyValueStoreName));
+ assertNull(testDriver.getTimestampedWindowStore(keyValueStoreName));
+ assertNull(testDriver.getSessionStore(keyValueStoreName));
+
+ assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreName));
+ assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreName));
+ assertNull(testDriver.getWindowStore(timestampedKeyValueStoreName));
+ assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreName));
+ assertNull(testDriver.getSessionStore(timestampedKeyValueStoreName));
+
+ assertNull(testDriver.getKeyValueStore(windowStoreName));
+ assertNull(testDriver.getTimestampedKeyValueStore(windowStoreName));
+ assertNotNull(testDriver.getWindowStore(windowStoreName));
+ assertNull(testDriver.getTimestampedWindowStore(windowStoreName));
+ assertNull(testDriver.getSessionStore(windowStoreName));
+
+ assertNull(testDriver.getKeyValueStore(timestampedWindowStoreName));
+ assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreName));
+ assertNotNull(testDriver.getWindowStore(timestampedWindowStoreName));
+ assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreName));
+ assertNull(testDriver.getSessionStore(timestampedWindowStoreName));
+
+ assertNull(testDriver.getKeyValueStore(sessionStoreName));
+ assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreName));
+ assertNull(testDriver.getWindowStore(sessionStoreName));
+ assertNull(testDriver.getTimestampedWindowStore(sessionStoreName));
+ assertNotNull(testDriver.getSessionStore(sessionStoreName));
+
+ // verify global stores
+ assertNotNull(testDriver.getKeyValueStore(globalKeyValueStoreName));
+ assertNull(testDriver.getTimestampedKeyValueStore(globalKeyValueStoreName));
+ assertNull(testDriver.getWindowStore(globalKeyValueStoreName));
+ assertNull(testDriver.getTimestampedWindowStore(globalKeyValueStoreName));
+ assertNull(testDriver.getSessionStore(globalKeyValueStoreName));
+
+ assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreName));
+ assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreName));
+ assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreName));
+ assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreName));
+ assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreName));
+ }
+
+ @Test
+ public void shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod() {
+ shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(false);
+ }
+
+ @Test
+ public void shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod() {
+ shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(true);
+ }
+
+ private void shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(final boolean persistent) {
+ final String keyValueStoreName = "keyValueStore";
+ final String timestampedKeyValueStoreName = "keyValueTimestampStore";
+ final String windowStoreName = "windowStore";
+ final String timestampedWindowStoreName = "windowTimestampStore";
+ final String sessionStoreName = "sessionStore";
+ final String globalKeyValueStoreName = "globalKeyValueStore";
+ final String globalTimestampedKeyValueStoreName = "globalKeyValueTimestampStore";
+
+ final Topology topology = setupSingleProcessorTopology();
+ addStoresToTopology(
+ topology,
+ persistent,
+ keyValueStoreName,
+ timestampedKeyValueStoreName,
+ windowStoreName,
+ timestampedWindowStoreName,
+ sessionStoreName,
+ globalKeyValueStoreName,
+ globalTimestampedKeyValueStoreName);
+
+
+ testDriver = new TopologyTestDriver(topology, config);
+
+ {
+ final IllegalArgumentException e = assertThrows(
+ IllegalArgumentException.class,
+ () -> testDriver.getStateStore(keyValueStoreName));
+ assertThat(
+ e.getMessage(),
+ equalTo("Store " + keyValueStoreName
+ + " is a key-value store and should be accessed via `getKeyValueStore()`"));
+ }
+ {
+ final IllegalArgumentException e = assertThrows(
+ IllegalArgumentException.class,
+ () -> testDriver.getStateStore(timestampedKeyValueStoreName));
+ assertThat(
+ e.getMessage(),
+ equalTo("Store " + timestampedKeyValueStoreName
+ + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`"));
+ }
+ {
+ final IllegalArgumentException e = assertThrows(
+ IllegalArgumentException.class,
+ () -> testDriver.getStateStore(windowStoreName));
+ assertThat(
+ e.getMessage(),
+ equalTo("Store " + windowStoreName
+ + " is a window store and should be accessed via `getWindowStore()`"));
+ }
+ {
+ final IllegalArgumentException e = assertThrows(
+ IllegalArgumentException.class,
+ () -> testDriver.getStateStore(timestampedWindowStoreName));
+ assertThat(
+ e.getMessage(),
+ equalTo("Store " + timestampedWindowStoreName
+ + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`"));
+ }
+ {
+ final IllegalArgumentException e = assertThrows(
+ IllegalArgumentException.class,
+ () -> testDriver.getStateStore(sessionStoreName));
+ assertThat(
+ e.getMessage(),
+ equalTo("Store " + sessionStoreName
+ + " is a session store and should be accessed via `getSessionStore()`"));
+ }
+ {
+ final IllegalArgumentException e = assertThrows(
+ IllegalArgumentException.class,
+ () -> testDriver.getStateStore(globalKeyValueStoreName));
+ assertThat(
+ e.getMessage(),
+ equalTo("Store " + globalKeyValueStoreName
+ + " is a key-value store and should be accessed via `getKeyValueStore()`"));
+ }
+ {
+ final IllegalArgumentException e = assertThrows(
+ IllegalArgumentException.class,
+ () -> testDriver.getStateStore(globalTimestampedKeyValueStoreName));
+ assertThat(
+ e.getMessage(),
+ equalTo("Store " + globalTimestampedKeyValueStoreName
+ + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`"));
+ }
+ }
+
+ private void addStoresToTopology(final Topology topology,
+ final boolean persistent,
+ final String keyValueStoreName,
+ final String timestampedKeyValueStoreName,
+ final String windowStoreName,
+ final String timestampedWindowStoreName,
+ final String sessionStoreName,
+ final String globalKeyValueStoreName,
+ final String globalTimestampedKeyValueStoreName) {
// add state stores
topology.addStateStore(
Stores.keyValueStoreBuilder(
@@ -835,52 +1002,6 @@ public class TopologyTestDriverTest {
"topicDummy2",
"processorDummy2",
() -> null);
-
- testDriver = new TopologyTestDriver(topology, config);
-
- // verify state stores
- assertNotNull(testDriver.getKeyValueStore(keyValueStoreName));
- assertNull(testDriver.getTimestampedKeyValueStore(keyValueStoreName));
- assertNull(testDriver.getWindowStore(keyValueStoreName));
- assertNull(testDriver.getTimestampedWindowStore(keyValueStoreName));
- assertNull(testDriver.getSessionStore(keyValueStoreName));
-
- assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreName));
- assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreName));
- assertNull(testDriver.getWindowStore(timestampedKeyValueStoreName));
- assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreName));
- assertNull(testDriver.getSessionStore(timestampedKeyValueStoreName));
-
- assertNull(testDriver.getKeyValueStore(windowStoreName));
- assertNull(testDriver.getTimestampedKeyValueStore(windowStoreName));
- assertNotNull(testDriver.getWindowStore(windowStoreName));
- assertNull(testDriver.getTimestampedWindowStore(windowStoreName));
- assertNull(testDriver.getSessionStore(windowStoreName));
-
- assertNull(testDriver.getKeyValueStore(timestampedWindowStoreName));
- assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreName));
- assertNotNull(testDriver.getWindowStore(timestampedWindowStoreName));
- assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreName));
- assertNull(testDriver.getSessionStore(timestampedWindowStoreName));
-
- assertNull(testDriver.getKeyValueStore(sessionStoreName));
- assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreName));
- assertNull(testDriver.getWindowStore(sessionStoreName));
- assertNull(testDriver.getTimestampedWindowStore(sessionStoreName));
- assertNotNull(testDriver.getSessionStore(sessionStoreName));
-
- // verify global stores
- assertNotNull(testDriver.getKeyValueStore(globalKeyValueStoreName));
- assertNull(testDriver.getTimestampedKeyValueStore(globalKeyValueStoreName));
- assertNull(testDriver.getWindowStore(globalKeyValueStoreName));
- assertNull(testDriver.getTimestampedWindowStore(globalKeyValueStoreName));
- assertNull(testDriver.getSessionStore(globalKeyValueStoreName));
-
- assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreName));
- assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreName));
- assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreName));
- assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreName));
- assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreName));
}
@Test