You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/17 22:50:57 UTC

[kafka] branch trunk updated: KAFKA-3522: TopologyTestDriver should only return custom stores via untyped getStateStore() method (#6756)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e018bfe  KAFKA-3522: TopologyTestDriver should only return custom stores via untyped getStateStore() method (#6756)
e018bfe is described below

commit e018bfe2b050b55b479bbabb9fc26ebcc211b177
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Sat May 18 00:50:45 2019 +0200

    KAFKA-3522: TopologyTestDriver should only return custom stores via untyped getStateStore() method (#6756)
    
    Reviewers: Sophie Blee-Goldman <so...@confluent.io>,  Bill Bejeck <bb...@gmail.com>
---
 .../apache/kafka/streams/TopologyTestDriver.java   |  61 +++++-
 .../kafka/streams/TopologyTestDriverTest.java      | 213 ++++++++++++++++-----
 2 files changed, 221 insertions(+), 53 deletions(-)

diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index bceead2..23dbf30 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -63,6 +63,9 @@ import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -571,6 +574,10 @@ public class TopologyTestDriver implements Closeable {
      * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
      * <p>
      * Note, that {@code StateStore} might be {@code null} if a store is added but not connected to any processor.
+     * <p>
+     * <strong>Caution:</strong> Using this method to access stores that are added by the DSL is unsafe as the store
+     * types may change. Stores added by the DSL should only be accessed via the corresponding typed methods
+     * like {@link #getKeyValueStore(String)} etc.
      *
      * @return all stores my name
      * @see #getStateStore(String)
@@ -584,7 +591,7 @@ public class TopologyTestDriver implements Closeable {
     public Map<String, StateStore> getAllStateStores() {
         final Map<String, StateStore> allStores = new HashMap<>();
         for (final String storeName : internalTopologyBuilder.allStateStoreName()) {
-            allStores.put(storeName, getStateStore(storeName));
+            allStores.put(storeName, getStateStore(storeName, false));
         }
         return allStores;
     }
@@ -593,11 +600,17 @@ public class TopologyTestDriver implements Closeable {
      * Get the {@link StateStore} with the given name.
      * The store can be a "regular" or global store.
      * <p>
+     * Should be used for custom stores only.
+     * For built-in stores, the corresponding typed methods like {@link #getKeyValueStore(String)} should be used.
+     * <p>
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
      * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
      *
      * @param name the name of the store
      * @return the state store, or {@code null} if no store has been registered with the given name
+     * @throws IllegalArgumentException if the store is a built-in store like {@link KeyValueStore},
+     * {@link WindowStore}, or {@link SessionStore}
+     *
      * @see #getAllStateStores()
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
@@ -606,10 +619,18 @@ public class TopologyTestDriver implements Closeable {
      * @see #getSessionStore(String)
      */
     @SuppressWarnings("WeakerAccess")
-    public StateStore getStateStore(final String name) {
+    public StateStore getStateStore(final String name) throws IllegalArgumentException {
+        return getStateStore(name, true);
+    }
+
+    private StateStore getStateStore(final String name,
+                                     final boolean throwForBuiltInStores) {
         if (task != null) {
             final StateStore stateStore = ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
             if (stateStore != null) {
+                if (throwForBuiltInStores) {
+                    throwIfBuiltInStore(stateStore);
+                }
                 return stateStore;
             }
         }
@@ -617,6 +638,9 @@ public class TopologyTestDriver implements Closeable {
         if (globalStateManager != null) {
             final StateStore stateStore = globalStateManager.getGlobalStore(name);
             if (stateStore != null) {
+                if (throwForBuiltInStores) {
+                    throwIfBuiltInStore(stateStore);
+                }
                 return stateStore;
             }
 
@@ -625,6 +649,29 @@ public class TopologyTestDriver implements Closeable {
         return null;
     }
 
+    private void throwIfBuiltInStore(final StateStore stateStore) {
+        if (stateStore instanceof TimestampedKeyValueStore) {
+            throw new IllegalArgumentException("Store " + stateStore.name()
+                + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`");
+        }
+        if (stateStore instanceof ReadOnlyKeyValueStore) {
+            throw new IllegalArgumentException("Store " + stateStore.name()
+                + " is a key-value store and should be accessed via `getKeyValueStore()`");
+        }
+        if (stateStore instanceof TimestampedWindowStore) {
+            throw new IllegalArgumentException("Store " + stateStore.name()
+                + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`");
+        }
+        if (stateStore instanceof ReadOnlyWindowStore) {
+            throw new IllegalArgumentException("Store " + stateStore.name()
+                + " is a window store and should be accessed via `getWindowStore()`");
+        }
+        if (stateStore instanceof ReadOnlySessionStore) {
+            throw new IllegalArgumentException("Store " + stateStore.name()
+                + " is a session store and should be accessed via `getSessionStore()`");
+        }
+    }
+
     /**
      * Get the {@link KeyValueStore} or {@link TimestampedKeyValueStore} with the given name.
      * The store can be a "regular" or global store.
@@ -648,7 +695,7 @@ public class TopologyTestDriver implements Closeable {
      */
     @SuppressWarnings({"unchecked", "WeakerAccess"})
     public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
-        final StateStore store = getStateStore(name);
+        final StateStore store = getStateStore(name, false);
         if (store instanceof TimestampedKeyValueStore) {
             log.info("Method #getTimestampedKeyValueStore() should be used to access a TimestampedKeyValueStore.");
             return new KeyValueStoreFacade<>((TimestampedKeyValueStore<K, V>) store);
@@ -674,7 +721,7 @@ public class TopologyTestDriver implements Closeable {
      */
     @SuppressWarnings({"unchecked", "WeakerAccess"})
     public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(final String name) {
-        final StateStore store = getStateStore(name);
+        final StateStore store = getStateStore(name, false);
         return store instanceof TimestampedKeyValueStore ? (TimestampedKeyValueStore<K, V>) store : null;
     }
 
@@ -701,7 +748,7 @@ public class TopologyTestDriver implements Closeable {
      */
     @SuppressWarnings({"unchecked", "WeakerAccess"})
     public <K, V> WindowStore<K, V> getWindowStore(final String name) {
-        final StateStore store = getStateStore(name);
+        final StateStore store = getStateStore(name, false);
         if (store instanceof TimestampedWindowStore) {
             log.info("Method #getTimestampedWindowStore() should be used to access a TimestampedWindowStore.");
             return new WindowStoreFacade<>((TimestampedWindowStore<K, V>) store);
@@ -727,7 +774,7 @@ public class TopologyTestDriver implements Closeable {
      */
     @SuppressWarnings({"unchecked", "WeakerAccess"})
     public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(final String name) {
-        final StateStore store = getStateStore(name);
+        final StateStore store = getStateStore(name, false);
         return store instanceof TimestampedWindowStore ? (TimestampedWindowStore<K, V>) store : null;
     }
 
@@ -749,7 +796,7 @@ public class TopologyTestDriver implements Closeable {
      */
     @SuppressWarnings({"unchecked", "WeakerAccess"})
     public <K, V> SessionStore<K, V> getSessionStore(final String name) {
-        final StateStore store = getStateStore(name);
+        final StateStore store = getStateStore(name, false);
         return store instanceof SessionStore ? (SessionStore<K, V>) store : null;
     }
 
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 2394203..84fae99 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -78,6 +78,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -749,7 +750,173 @@ public class TopologyTestDriverTest {
         final String globalTimestampedKeyValueStoreName = "globalKeyValueTimestampStore";
 
         final Topology topology = setupSingleProcessorTopology();
+        addStoresToTopology(
+            topology,
+            persistent,
+            keyValueStoreName,
+            timestampedKeyValueStoreName,
+            windowStoreName,
+            timestampedWindowStoreName,
+            sessionStoreName,
+            globalKeyValueStoreName,
+            globalTimestampedKeyValueStoreName);
 
+
+        testDriver = new TopologyTestDriver(topology, config);
+
+        // verify state stores
+        assertNotNull(testDriver.getKeyValueStore(keyValueStoreName));
+        assertNull(testDriver.getTimestampedKeyValueStore(keyValueStoreName));
+        assertNull(testDriver.getWindowStore(keyValueStoreName));
+        assertNull(testDriver.getTimestampedWindowStore(keyValueStoreName));
+        assertNull(testDriver.getSessionStore(keyValueStoreName));
+
+        assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreName));
+        assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreName));
+        assertNull(testDriver.getWindowStore(timestampedKeyValueStoreName));
+        assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreName));
+        assertNull(testDriver.getSessionStore(timestampedKeyValueStoreName));
+
+        assertNull(testDriver.getKeyValueStore(windowStoreName));
+        assertNull(testDriver.getTimestampedKeyValueStore(windowStoreName));
+        assertNotNull(testDriver.getWindowStore(windowStoreName));
+        assertNull(testDriver.getTimestampedWindowStore(windowStoreName));
+        assertNull(testDriver.getSessionStore(windowStoreName));
+
+        assertNull(testDriver.getKeyValueStore(timestampedWindowStoreName));
+        assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreName));
+        assertNotNull(testDriver.getWindowStore(timestampedWindowStoreName));
+        assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreName));
+        assertNull(testDriver.getSessionStore(timestampedWindowStoreName));
+
+        assertNull(testDriver.getKeyValueStore(sessionStoreName));
+        assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreName));
+        assertNull(testDriver.getWindowStore(sessionStoreName));
+        assertNull(testDriver.getTimestampedWindowStore(sessionStoreName));
+        assertNotNull(testDriver.getSessionStore(sessionStoreName));
+
+        // verify global stores
+        assertNotNull(testDriver.getKeyValueStore(globalKeyValueStoreName));
+        assertNull(testDriver.getTimestampedKeyValueStore(globalKeyValueStoreName));
+        assertNull(testDriver.getWindowStore(globalKeyValueStoreName));
+        assertNull(testDriver.getTimestampedWindowStore(globalKeyValueStoreName));
+        assertNull(testDriver.getSessionStore(globalKeyValueStoreName));
+
+        assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreName));
+        assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreName));
+        assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreName));
+        assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreName));
+        assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreName));
+    }
+
+    @Test
+    public void shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod() {
+        shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(false);
+    }
+
+    @Test
+    public void shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod() {
+        shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(true);
+    }
+
+    private void shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(final boolean persistent) {
+        final String keyValueStoreName = "keyValueStore";
+        final String timestampedKeyValueStoreName = "keyValueTimestampStore";
+        final String windowStoreName = "windowStore";
+        final String timestampedWindowStoreName = "windowTimestampStore";
+        final String sessionStoreName = "sessionStore";
+        final String globalKeyValueStoreName = "globalKeyValueStore";
+        final String globalTimestampedKeyValueStoreName = "globalKeyValueTimestampStore";
+
+        final Topology topology = setupSingleProcessorTopology();
+        addStoresToTopology(
+            topology,
+            persistent,
+            keyValueStoreName,
+            timestampedKeyValueStoreName,
+            windowStoreName,
+            timestampedWindowStoreName,
+            sessionStoreName,
+            globalKeyValueStoreName,
+            globalTimestampedKeyValueStoreName);
+
+
+        testDriver = new TopologyTestDriver(topology, config);
+
+        {
+            final IllegalArgumentException e = assertThrows(
+                IllegalArgumentException.class,
+                () -> testDriver.getStateStore(keyValueStoreName));
+            assertThat(
+                e.getMessage(),
+                equalTo("Store " + keyValueStoreName
+                    + " is a key-value store and should be accessed via `getKeyValueStore()`"));
+        }
+        {
+            final IllegalArgumentException e = assertThrows(
+                IllegalArgumentException.class,
+                () -> testDriver.getStateStore(timestampedKeyValueStoreName));
+            assertThat(
+                e.getMessage(),
+                equalTo("Store " + timestampedKeyValueStoreName
+                    + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`"));
+        }
+        {
+            final IllegalArgumentException e = assertThrows(
+                IllegalArgumentException.class,
+                () -> testDriver.getStateStore(windowStoreName));
+            assertThat(
+                e.getMessage(),
+                equalTo("Store " + windowStoreName
+                    + " is a window store and should be accessed via `getWindowStore()`"));
+        }
+        {
+            final IllegalArgumentException e = assertThrows(
+                IllegalArgumentException.class,
+                () -> testDriver.getStateStore(timestampedWindowStoreName));
+            assertThat(
+                e.getMessage(),
+                equalTo("Store " + timestampedWindowStoreName
+                    + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`"));
+        }
+        {
+            final IllegalArgumentException e = assertThrows(
+                IllegalArgumentException.class,
+                () -> testDriver.getStateStore(sessionStoreName));
+            assertThat(
+                e.getMessage(),
+                equalTo("Store " + sessionStoreName
+                    + " is a session store and should be accessed via `getSessionStore()`"));
+        }
+        {
+            final IllegalArgumentException e = assertThrows(
+                IllegalArgumentException.class,
+                () -> testDriver.getStateStore(globalKeyValueStoreName));
+            assertThat(
+                e.getMessage(),
+                equalTo("Store " + globalKeyValueStoreName
+                    + " is a key-value store and should be accessed via `getKeyValueStore()`"));
+        }
+        {
+            final IllegalArgumentException e = assertThrows(
+                IllegalArgumentException.class,
+                () -> testDriver.getStateStore(globalTimestampedKeyValueStoreName));
+            assertThat(
+                e.getMessage(),
+                equalTo("Store " + globalTimestampedKeyValueStoreName
+                    + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`"));
+        }
+    }
+
+    private void addStoresToTopology(final Topology topology,
+                                     final boolean persistent,
+                                     final String keyValueStoreName,
+                                     final String timestampedKeyValueStoreName,
+                                     final String windowStoreName,
+                                     final String timestampedWindowStoreName,
+                                     final String sessionStoreName,
+                                     final String globalKeyValueStoreName,
+                                     final String globalTimestampedKeyValueStoreName) {
         // add state stores
         topology.addStateStore(
             Stores.keyValueStoreBuilder(
@@ -835,52 +1002,6 @@ public class TopologyTestDriverTest {
             "topicDummy2",
             "processorDummy2",
             () -> null);
-
-        testDriver = new TopologyTestDriver(topology, config);
-
-        // verify state stores
-        assertNotNull(testDriver.getKeyValueStore(keyValueStoreName));
-        assertNull(testDriver.getTimestampedKeyValueStore(keyValueStoreName));
-        assertNull(testDriver.getWindowStore(keyValueStoreName));
-        assertNull(testDriver.getTimestampedWindowStore(keyValueStoreName));
-        assertNull(testDriver.getSessionStore(keyValueStoreName));
-
-        assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreName));
-        assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreName));
-        assertNull(testDriver.getWindowStore(timestampedKeyValueStoreName));
-        assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreName));
-        assertNull(testDriver.getSessionStore(timestampedKeyValueStoreName));
-
-        assertNull(testDriver.getKeyValueStore(windowStoreName));
-        assertNull(testDriver.getTimestampedKeyValueStore(windowStoreName));
-        assertNotNull(testDriver.getWindowStore(windowStoreName));
-        assertNull(testDriver.getTimestampedWindowStore(windowStoreName));
-        assertNull(testDriver.getSessionStore(windowStoreName));
-
-        assertNull(testDriver.getKeyValueStore(timestampedWindowStoreName));
-        assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreName));
-        assertNotNull(testDriver.getWindowStore(timestampedWindowStoreName));
-        assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreName));
-        assertNull(testDriver.getSessionStore(timestampedWindowStoreName));
-
-        assertNull(testDriver.getKeyValueStore(sessionStoreName));
-        assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreName));
-        assertNull(testDriver.getWindowStore(sessionStoreName));
-        assertNull(testDriver.getTimestampedWindowStore(sessionStoreName));
-        assertNotNull(testDriver.getSessionStore(sessionStoreName));
-
-        // verify global stores
-        assertNotNull(testDriver.getKeyValueStore(globalKeyValueStoreName));
-        assertNull(testDriver.getTimestampedKeyValueStore(globalKeyValueStoreName));
-        assertNull(testDriver.getWindowStore(globalKeyValueStoreName));
-        assertNull(testDriver.getTimestampedWindowStore(globalKeyValueStoreName));
-        assertNull(testDriver.getSessionStore(globalKeyValueStoreName));
-
-        assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreName));
-        assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreName));
-        assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreName));
-        assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreName));
-        assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreName));
     }
 
     @Test