You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/12 15:09:20 UTC

[kafka] branch 2.7 updated: KAFKA-10598: Improve IQ name and type checks (#9408)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 716d938  KAFKA-10598: Improve IQ name and type checks (#9408)
716d938 is described below

commit 716d9389623ac20f350abefe04e91036016b6cb7
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Mon Oct 12 09:34:32 2020 -0500

    KAFKA-10598: Improve IQ name and type checks (#9408)
    
    Previously, we would throw a confusing error, "the store has migrated,"
    when users ask for a store that is not in the topology at all, or when the
    type of the store doesn't match the QueryableStoreType parameter.
    
    Adds an up-front check that the requested store is registered and also
    a better error message when the QueryableStoreType parameter
    doesn't match the store's type.
    
    Reviewers: Guozhang Wang <gu...@apache.org>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  21 +++-
 .../org/apache/kafka/streams/StreamsBuilder.java   |   8 +-
 .../kafka/streams/kstream/KGroupedStream.java      |  12 +-
 .../kafka/streams/kstream/KGroupedTable.java       |  10 +-
 .../kstream/SessionWindowedCogroupedKStream.java   |   4 +-
 .../streams/kstream/SessionWindowedKStream.java    |   8 +-
 .../apache/kafka/streams/kstream/Suppressed.java   |   2 +-
 .../kstream/TimeWindowedCogroupedKStream.java      |   4 +-
 .../kafka/streams/kstream/TimeWindowedKStream.java |   8 +-
 .../kstream/internals/MaterializedInternal.java    |   8 +-
 .../processor/internals/ProcessorTopology.java     |  15 ++-
 .../internals/StreamThreadStateStoreProvider.java  |  61 +++++-----
 .../org/apache/kafka/streams/TopologyTest.java     |   4 +-
 .../integration/JoinStoreIntegrationTest.java      |  23 +++-
 ...reignKeyJoinMaterializationIntegrationTest.java |  16 +--
 .../integration/QueryableStateIntegrationTest.java | 128 ++++++++++++++++++---
 .../integration/utils/IntegrationTestUtils.java    |  30 ++++-
 .../internals/InternalTopologyBuilderTest.java     |   8 ++
 .../StreamThreadStateStoreProviderTest.java        |  43 ++++---
 19 files changed, 295 insertions(+), 118 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 83b84dc..b5e64e8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -153,6 +153,8 @@ public class KafkaStreams implements AutoCloseable {
     private final QueryableStoreProvider queryableStoreProvider;
     private final Admin adminClient;
     private final StreamsMetricsImpl streamsMetrics;
+    private final ProcessorTopology taskTopology;
+    private final ProcessorTopology globalTaskTopology;
 
     GlobalStreamThread globalStreamThread;
     private KafkaStreams.StateListener stateListener;
@@ -702,7 +704,7 @@ public class KafkaStreams implements AutoCloseable {
         internalTopologyBuilder.rewriteTopology(config);
 
         // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
-        final ProcessorTopology taskTopology = internalTopologyBuilder.buildTopology();
+        taskTopology = internalTopologyBuilder.buildTopology();
 
         streamsMetadataState = new StreamsMetadataState(
                 internalTopologyBuilder,
@@ -719,7 +721,7 @@ public class KafkaStreams implements AutoCloseable {
         // create the stream thread, global update thread, and cleanup thread
         threads = new StreamThread[numStreamThreads];
 
-        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
         final boolean hasGlobalTopology = globalTaskTopology != null;
 
         if (numStreamThreads == 0 && !hasGlobalTopology) {
@@ -1198,10 +1200,21 @@ public class KafkaStreams implements AutoCloseable {
      *
      * @param storeQueryParameters   the parameters used to fetch a queryable store
      * @return A facade wrapping the local {@link StateStore} instances
-     * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
-     * {@code queryableStoreType} doesn't exist
+     * @throws InvalidStateStoreException If the specified store name does not exist in the topology
+     *                                    or if the Streams instance isn't in a queryable state.
+     *                                    If the store's type does not match the QueryableStoreType,
+     *                                    the Streams instance is not in a queryable state with respect
+     *                                    to the parameters, or if the store is not available locally, then
+     *                                    an InvalidStateStoreException is thrown upon store access.
      */
     public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
+        final String storeName = storeQueryParameters.storeName();
+        if ((taskTopology == null || !taskTopology.hasStore(storeName))
+            && (globalTaskTopology == null || !globalTaskTopology.hasStore(storeName))) {
+            throw new InvalidStateStoreException(
+                "Cannot get state store " + storeName + " because no such store is registered in the topology."
+            );
+        }
         validateIsRunningOrRebalancing();
         return queryableStoreProvider.getStore(storeQueryParameters);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 62f7b94..5bd9bf1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -259,7 +259,7 @@ public class StreamsBuilder {
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queryable through Interactive Queries.
      * An internal changelog topic is created by default. Because the source topic can
      * be used for recovery, you can avoid creating the changelog topic by setting
      * the {@code "topology.optimization"} to {@code "all"} in the {@link StreamsConfig}.
@@ -281,7 +281,7 @@ public class StreamsBuilder {
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queryable through Interactive Queries.
      * An internal changelog topic is created by default. Because the source topic can
      * be used for recovery, you can avoid creating the changelog topic by setting
      * the {@code "topology.optimization"} to {@code "all"} in the {@link StreamsConfig}.
@@ -342,7 +342,7 @@ public class StreamsBuilder {
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queryable through Interactive Queries.
      * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -373,7 +373,7 @@ public class StreamsBuilder {
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queryable through Interactive Queries.
      * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index d805f8b..4ecc1e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -60,7 +60,7 @@ public interface KGroupedStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -87,7 +87,7 @@ public interface KGroupedStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -215,7 +215,7 @@ public interface KGroupedStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -277,7 +277,7 @@ public interface KGroupedStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -340,7 +340,7 @@ public interface KGroupedStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -386,7 +386,7 @@ public interface KGroupedStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 79bdef4..e193bd1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -146,7 +146,7 @@ public interface KGroupedTable<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -175,7 +175,7 @@ public interface KGroupedTable<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -376,7 +376,7 @@ public interface KGroupedTable<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -611,7 +611,7 @@ public interface KGroupedTable<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -681,7 +681,7 @@ public interface KGroupedTable<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
index 2a2cf34..d4f2094 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
@@ -85,7 +85,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -130,7 +130,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index 13bdd17..0f8ec21 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -73,7 +73,7 @@ public interface SessionWindowedKStream<K, V> {
      * user-specified in {@link StreamsConfig StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -103,7 +103,7 @@ public interface SessionWindowedKStream<K, V> {
      * user-specified in {@link StreamsConfig StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -241,7 +241,7 @@ public interface SessionWindowedKStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -290,7 +290,7 @@ public interface SessionWindowedKStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index 060b949..2f96993 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -153,7 +153,7 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
      * To accomplish this, the operator will buffer events from the window until the window close (that is,
      * until the end-time passes, and additionally until the grace period expires). Since windowed operators
      * are required to reject out-of-order events for a window whose grace period is expired, there is an additional
-     * guarantee that the final results emitted from this suppression will match any queriable state upstream.
+     * guarantee that the final results emitted from this suppression will match any queryable state upstream.
      *
      * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
      *                     This is required to be a "strict" config, since it would violate the "final results"
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
index d7e261a..f010501 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
@@ -82,7 +82,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -121,7 +121,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 85f7cf8..fc53f28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -72,7 +72,7 @@ public interface TimeWindowedKStream<K, V> {
      * user-specified in {@link StreamsConfig StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -101,7 +101,7 @@ public interface TimeWindowedKStream<K, V> {
      * user-specified in {@link StreamsConfig StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -239,7 +239,7 @@ public interface TimeWindowedKStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
@@ -283,7 +283,7 @@ public interface TimeWindowedKStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
      * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
+     * Note that the internal store name may not be queryable through Interactive Queries.
      * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 3948705..4a3cbb2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -26,7 +26,7 @@ import java.util.Map;
 
 public class MaterializedInternal<K, V, S extends StateStore> extends Materialized<K, V, S> {
 
-    private final boolean queriable;
+    private final boolean queryable;
 
     public MaterializedInternal(final Materialized<K, V, S> materialized) {
         this(materialized, null, null);
@@ -39,14 +39,14 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
 
         // if storeName is not provided, the corresponding KTable would never be queryable;
         // but we still need to provide an internal name for it in case we materialize.
-        queriable = storeName() != null;
-        if (!queriable && nameProvider != null) {
+        queryable = storeName() != null;
+        if (!queryable && nameProvider != null) {
             storeName = nameProvider.newStoreName(generatedStorePrefix);
         }
     }
 
     public String queryableStoreName() {
-        return queriable ? storeName() : null;
+        return queryable ? storeName() : null;
     }
 
     public String storeName() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 75f3688..4ee3c0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -16,17 +16,18 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import org.apache.kafka.streams.processor.StateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.stream.Collectors;
 
 public class ProcessorTopology {
     private final Logger log = LoggerFactory.getLogger(ProcessorTopology.class);
@@ -37,6 +38,7 @@ public class ProcessorTopology {
     private final Map<String, SinkNode<?, ?, ?, ?>> sinksByTopic;
     private final Set<String> terminalNodes;
     private final List<StateStore> stateStores;
+    private final Set<String> stateStoreNames;
     private final Set<String> repartitionTopics;
 
     // the following contains entries for the entire topology, eg stores that do not belong to this ProcessorTopology
@@ -54,6 +56,7 @@ public class ProcessorTopology {
         this.sourceNodesByTopic = new HashMap<>(sourceNodesByTopic);
         this.sinksByTopic = Collections.unmodifiableMap(sinksByTopic);
         this.stateStores = Collections.unmodifiableList(stateStores);
+        stateStoreNames = stateStores.stream().map(StateStore::name).collect(Collectors.toSet());
         this.globalStateStores = Collections.unmodifiableList(globalStateStores);
         this.storeToChangelogTopic = Collections.unmodifiableMap(storeToChangelogTopic);
         this.repartitionTopics = Collections.unmodifiableSet(repartitionTopics);
@@ -103,6 +106,10 @@ public class ProcessorTopology {
         return stateStores;
     }
 
+    public boolean hasStore(final String storeName) {
+        return stateStoreNames.contains(storeName);
+    }
+
     public List<StateStore> globalStateStores() {
         return Collections.unmodifiableList(globalStateStores);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index d5a175d..df541d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -27,12 +27,10 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Collection;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.stream.Collectors;
 
 public class StreamThreadStateStoreProvider {
 
@@ -55,17 +53,27 @@ public class StreamThreadStateStoreProvider {
                     streamThread.allTasks().values() : streamThread.activeTasks();
 
             if (storeQueryParams.partition() != null) {
-                return findStreamTask(tasks, storeName, storeQueryParams.partition()).
-                        map(streamTask ->
-                                validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
-                        map(Collections::singletonList).
-                        orElse(Collections.emptyList());
+                for (final Task task : tasks) {
+                    if (task.id().partition == storeQueryParams.partition() &&
+                        task.getStore(storeName) != null &&
+                        storeName.equals(task.getStore(storeName).name())) {
+                        final T typedStore = validateAndCastStores(task.getStore(storeName), queryableStoreType, storeName, task.id());
+                        return Collections.singletonList(typedStore);
+                    }
+                }
+                return Collections.emptyList();
             } else {
-                return tasks.stream().
-                        map(streamTask ->
-                                validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
-                        filter(Objects::nonNull).
-                        collect(Collectors.toList());
+                final List<T> list = new ArrayList<>();
+                for (final Task task : tasks) {
+                    final StateStore store = task.getStore(storeName);
+                    if (store == null) {
+                        // then this task doesn't have that store
+                    } else {
+                        final T typedStore = validateAndCastStores(store, queryableStoreType, storeName, task.id());
+                        list.add(typedStore);
+                    }
+                }
+                return list;
             }
         } else {
             throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
@@ -75,13 +83,18 @@ public class StreamThreadStateStoreProvider {
     }
 
     @SuppressWarnings("unchecked")
-    private <T> T validateAndListStores(final StateStore store, final QueryableStoreType<T> queryableStoreType, final String storeName, final TaskId taskId) {
-        if (store != null && queryableStoreType.accepts(store)) {
+    private static <T> T validateAndCastStores(final StateStore store,
+                                               final QueryableStoreType<T> queryableStoreType,
+                                               final String storeName,
+                                               final TaskId taskId) {
+        if (store == null) {
+            throw new NullPointerException("Expected store not to be null at this point.");
+        } else if (queryableStoreType.accepts(store)) {
             if (!store.isOpen()) {
                 throw new InvalidStateStoreException(
                         "Cannot get state store " + storeName + " for task " + taskId +
                             " because the store is not open. " +
-                            "The state store may have migrated to another instances.");
+                            "The state store may have migrated to another instance.");
             }
             if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
                 return (T) new ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store);
@@ -91,15 +104,11 @@ public class StreamThreadStateStoreProvider {
                 return (T) store;
             }
         } else {
-            return null;
+            throw new InvalidStateStoreException(
+                "Cannot get state store " + storeName +
+                    " because the queryable store type [" + queryableStoreType.getClass() +
+                    "] does not accept the actual store type [" + store.getClass() + "]."
+            );
         }
     }
-
-    private Optional<Task> findStreamTask(final Collection<Task> tasks, final String storeName, final int partition) {
-        return tasks.stream().
-                filter(streamTask -> streamTask.id().partition == partition &&
-                        streamTask.getStore(storeName) != null &&
-                        storeName.equals(streamTask.getStore(storeName).name())).
-                findFirst();
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index ef9becf..f1a7749 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -1032,7 +1032,7 @@ public class TopologyTest {
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
                 // previously, this was
                 //   Processor: KTABLE-MAPVALUES-0000000004 (stores: [KTABLE-MAPVALUES-STATE-STORE-0000000003]
-                // but we added a change not to materialize non-queriable stores. This change shouldn't break compatibility.
+                // but we added a change not to materialize non-queryable stores. This change shouldn't break compatibility.
                 "    Processor: KTABLE-MAPVALUES-0000000004 (stores: [])\n" +
                 "      --> none\n" +
                 "      <-- KTABLE-SOURCE-0000000002\n" +
@@ -1099,7 +1099,7 @@ public class TopologyTest {
                 "      <-- KSTREAM-SOURCE-0000000001\n" +
                 // Previously, this was
                 //   Processor: KTABLE-FILTER-0000000004 (stores: [KTABLE-FILTER-STATE-STORE-0000000003]
-                // but we added a change not to materialize non-queriable stores. This change shouldn't break compatibility.
+                // but we added a change not to materialize non-queryable stores. This change shouldn't break compatibility.
                 "    Processor: KTABLE-FILTER-0000000004 (stores: [])\n" +
                 "      --> none\n" +
                 "      <-- KTABLE-SOURCE-0000000002\n" +
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index e15788a..9acc879 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -16,12 +16,9 @@
  */
 package org.apache.kafka.streams.integration;
 
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -30,7 +27,6 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.StreamJoined;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -42,7 +38,14 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
 import static java.time.Duration.ofMillis;
+import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThrows;
 
 @Category({IntegrationTest.class})
@@ -85,7 +88,7 @@ public class JoinStoreIntegrationTest {
     }
 
     @Test
-    public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedException {
+    public void providingAJoinStoreNameShouldNotMakeTheJoinResultQueriable() throws InterruptedException {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-no-store-access");
         final StreamsBuilder builder = new StreamsBuilder();
 
@@ -108,7 +111,15 @@ public class JoinStoreIntegrationTest {
 
             kafkaStreams.start();
             latch.await();
-            assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore())).get(1));
+            final InvalidStateStoreException exception =
+                assertThrows(
+                    InvalidStateStoreException.class,
+                    () -> kafkaStreams.store(fromNameAndType("join-store", keyValueStore()))
+                );
+            assertThat(
+                exception.getMessage(),
+                is("Cannot get state store join-store because no such store is registered in the topology.")
+            );
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
index 11aa616..23870a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -62,13 +62,13 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
     private static final String RIGHT_TABLE = "right_table";
     private static final String OUTPUT = "output-topic";
     private final boolean materialized;
-    private final boolean queriable;
+    private final boolean queryable;
 
     private Properties streamsConfig;
 
-    public KTableKTableForeignKeyJoinMaterializationIntegrationTest(final boolean materialized, final boolean queriable) {
+    public KTableKTableForeignKeyJoinMaterializationIntegrationTest(final boolean materialized, final boolean queryable) {
         this.materialized = materialized;
-        this.queriable = queriable;
+        this.queryable = queryable;
     }
 
     @Rule
@@ -85,7 +85,7 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
     }
 
 
-    @Parameterized.Parameters(name = "materialized={0}, queriable={1}")
+    @Parameterized.Parameters(name = "materialized={0}, queryable={1}")
     public static Collection<Object[]> data() {
         return Arrays.asList(
             new Object[] {false, false},
@@ -108,7 +108,7 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
                 outputTopic.readKeyValuesToMap(),
                 is(emptyMap())
             );
-            if (materialized && queriable) {
+            if (materialized && queryable) {
                 assertThat(
                     asMap(store),
                     is(emptyMap())
@@ -119,7 +119,7 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
             // it's not possible to know whether a result was previously emitted.
             left.pipeInput("lhs1", (String) null);
             {
-                if (materialized && queriable) {
+                if (materialized && queryable) {
                     // in only this specific case, the record cache will actually be activated and
                     // suppress the unnecessary tombstone. This is because the cache is able to determine
                     // for sure that there has never been a previous result. (Because the "old" and "new" values
@@ -148,7 +148,7 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
                     outputTopic.readKeyValuesToMap(),
                     is(emptyMap())
                 );
-                if (materialized && queriable) {
+                if (materialized && queryable) {
                     assertThat(
                         asMap(store),
                         is(emptyMap())
@@ -175,7 +175,7 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
         final ValueJoiner<String, String, String> joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")";
 
         final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized;
-        if (queriable) {
+        if (queryable) {
             materialized = Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(queryableStoreName).withValueSerde(Serdes.String());
         } else {
             materialized = Materialized.with(null, Serdes.String());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index fe6ad61..b2680b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -29,13 +29,13 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.KafkaStreamsTest;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.KeyQueryMetadata;
-import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.LagInfo;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -48,17 +48,19 @@ import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.streams.state.StreamsMetadata;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.StreamsMetadata;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.NoRetryException;
 import org.apache.kafka.test.TestUtils;
+import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -97,16 +99,26 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static java.time.Duration.ofMillis;
 import static java.time.Duration.ofSeconds;
 import static java.time.Instant.ofEpochMilli;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getRunningStreams;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.sessionStore;
 import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
 import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 @Category({IntegrationTest.class})
@@ -305,7 +317,7 @@ public class QueryableStateIntegrationTest {
                     final int index = queryMetadata.activeHost().port();
                     final KafkaStreams streamsWithKey = pickInstanceByPort ? streamsList.get(index) : streams;
                     final ReadOnlyKeyValueStore<String, Long> store =
-                        IntegrationTestUtils.getStore(storeName, streamsWithKey, true, QueryableStoreTypes.keyValueStore());
+                        IntegrationTestUtils.getStore(storeName, streamsWithKey, true, keyValueStore());
                     if (store == null) {
                         nullStoreKeys.add(key);
                         continue;
@@ -430,6 +442,88 @@ public class QueryableStateIntegrationTest {
     }
 
     @Test
+    public void shouldRejectNonExistentStoreName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(
+            "input",
+            Materialized
+                .<String, String, KeyValueStore<Bytes, byte[]>>as("input-table")
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String())
+        );
+
+        final Properties properties = mkProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueTestName(getClass(), testName)),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
+        ));
+
+        try (final KafkaStreams streams = getStartedStreams(properties, builder, true)) {
+            final ReadOnlyKeyValueStore<String, String> store =
+                streams.store(fromNameAndType("input-table", keyValueStore()));
+            assertThat(store, Matchers.notNullValue());
+
+            final InvalidStateStoreException exception = assertThrows(
+                InvalidStateStoreException.class,
+                () -> streams.store(fromNameAndType("no-table", keyValueStore()))
+            );
+            assertThat(
+                exception.getMessage(),
+                is("Cannot get state store no-table because no such store is registered in the topology.")
+            );
+        }
+    }
+
+    @Test
+    public void shouldRejectWronglyTypedStore() throws InterruptedException {
+        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+        final String input = uniqueTestName + "-input";
+        final String storeName = uniqueTestName + "-input-table";
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(
+            input,
+            Materialized
+                .<String, String, KeyValueStore<Bytes, byte[]>>as(storeName)
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String())
+        );
+
+        CLUSTER.createTopic(input);
+
+        final Properties properties = mkProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, uniqueTestName + "-app"),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
+        ));
+
+        try (final KafkaStreams streams = getRunningStreams(properties, builder, true)) {
+            final ReadOnlyKeyValueStore<String, String> store =
+                streams.store(fromNameAndType(storeName, keyValueStore()));
+            assertThat(store, Matchers.notNullValue());
+
+            // Note that to check the type we actually need a store reference,
+            // so we can't check when you get the IQ store, only when you
+            // try to use it. Presumably, this could be improved.
+            final ReadOnlySessionStore<String, String> sessionStore =
+                streams.store(fromNameAndType(storeName, sessionStore()));
+            final InvalidStateStoreException exception = assertThrows(
+                InvalidStateStoreException.class,
+                () -> sessionStore.fetch("a")
+            );
+            assertThat(
+                exception.getMessage(),
+                is(
+                    "Cannot get state store " + storeName + " because the queryable store type" +
+                        " [class org.apache.kafka.streams.state.QueryableStoreTypes$SessionStoreType]" +
+                        " does not accept the actual store type" +
+                        " [class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore]."
+                )
+            );
+        } finally {
+            CLUSTER.deleteAllTopicsAndWait(0L);
+        }
+    }
+
+    @Test
     public void shouldBeAbleToQueryDuringRebalance() throws Exception {
         final int numThreads = STREAM_TWO_PARTITIONS;
         final List<KafkaStreams> streamsList = new ArrayList<>(numThreads);
@@ -670,10 +764,10 @@ public class QueryableStateIntegrationTest {
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
         final ReadOnlyKeyValueStore<String, Long> myFilterStore =
-            IntegrationTestUtils.getStore("queryFilter", kafkaStreams, QueryableStoreTypes.keyValueStore());
+            IntegrationTestUtils.getStore("queryFilter", kafkaStreams, keyValueStore());
 
         final ReadOnlyKeyValueStore<String, Long> myFilterNotStore =
-            IntegrationTestUtils.getStore("queryFilterNot", kafkaStreams, QueryableStoreTypes.keyValueStore());
+            IntegrationTestUtils.getStore("queryFilterNot", kafkaStreams, keyValueStore());
 
         for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
             TestUtils.waitForCondition(() -> expectedEntry.value.equals(myFilterStore.get(expectedEntry.key)),
@@ -737,7 +831,7 @@ public class QueryableStateIntegrationTest {
         waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
 
         final ReadOnlyKeyValueStore<String, Long> myMapStore =
-            IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, QueryableStoreTypes.keyValueStore());
+            IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, keyValueStore());
 
         for (final KeyValue<String, String> batchEntry : batch1) {
             assertEquals(Long.valueOf(batchEntry.value), myMapStore.get(batchEntry.key));
@@ -786,7 +880,7 @@ public class QueryableStateIntegrationTest {
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
         final ReadOnlyKeyValueStore<String, Long> myMapStore =
-            IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, QueryableStoreTypes.keyValueStore());
+            IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, keyValueStore());
 
         for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
             assertEquals(expectedEntry.value, myMapStore.get(expectedEntry.key));
@@ -847,7 +941,7 @@ public class QueryableStateIntegrationTest {
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
         final ReadOnlyKeyValueStore<String, Long> myCount =
-            IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore());
+            IntegrationTestUtils.getStore(storeName, kafkaStreams, keyValueStore());
 
         final ReadOnlyWindowStore<String, Long> windowStore =
             IntegrationTestUtils.getStore(windowStoreName, kafkaStreams, QueryableStoreTypes.windowStore());
@@ -885,7 +979,7 @@ public class QueryableStateIntegrationTest {
         final int maxWaitMs = 30000;
 
         final ReadOnlyKeyValueStore<String, Long> store =
-            IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore());
+            IntegrationTestUtils.getStore(storeName, kafkaStreams, keyValueStore());
 
         TestUtils.waitForCondition(
             () -> Long.valueOf(8).equals(store.get("hello")),
@@ -903,7 +997,7 @@ public class QueryableStateIntegrationTest {
         TestUtils.waitForCondition(
             () -> {
                 try {
-                    assertEquals(8L, IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore()).get("hello"));
+                    assertEquals(8L, IntegrationTestUtils.getStore(storeName, kafkaStreams, keyValueStore()).get("hello"));
                     return true;
                 } catch (final InvalidStateStoreException ise) {
                     return false;
@@ -959,7 +1053,7 @@ public class QueryableStateIntegrationTest {
         final int maxWaitMs = 30000;
 
         final ReadOnlyKeyValueStore<String, String> store =
-            IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore());
+            IntegrationTestUtils.getStore(storeName, kafkaStreams, keyValueStore());
 
         TestUtils.waitForCondition(
             () -> "12".equals(store.get("a")) && "34".equals(store.get("b")),
@@ -982,7 +1076,7 @@ public class QueryableStateIntegrationTest {
             "wait for thread to fail");
 
         final ReadOnlyKeyValueStore<String, String> store2 =
-            IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore());
+            IntegrationTestUtils.getStore(storeName, kafkaStreams, keyValueStore());
 
         try {
             TestUtils.waitForCondition(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index d8fb5d2..03b287d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -16,10 +16,6 @@
  */
 package org.apache.kafka.streams.integration.utils;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 import kafka.api.Request;
 import kafka.server.KafkaServer;
 import kafka.server.MetadataCache;
@@ -79,9 +75,13 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -1187,6 +1187,28 @@ public class IntegrationTestUtils {
         return driver;
     }
 
+    public static KafkaStreams getRunningStreams(final Properties streamsConfig,
+                                                 final StreamsBuilder builder,
+                                                 final boolean clean) {
+        final KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig);
+        if (clean) {
+            driver.cleanUp();
+        }
+        final CountDownLatch latch = new CountDownLatch(1);
+        driver.setStateListener((newState, oldState) -> {
+            if (newState == State.RUNNING) {
+                latch.countDown();
+            }
+        });
+        driver.start();
+        try {
+            latch.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException e) {
+            throw new RuntimeException("Streams didn't start in time.", e);
+        }
+        return driver;
+    }
+
     public static <S> S getStore(final String storeName,
                                  final KafkaStreams streams,
                                  final QueryableStoreType<S> storeType) throws Exception {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 0bfecc7..7166bf5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -47,6 +47,9 @@ import java.util.regex.Pattern;
 
 import static java.time.Duration.ofSeconds;
 import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -1111,6 +1114,11 @@ public class InternalTopologyBuilderTest {
         );
         builder.initializeSubscription();
 
+        builder.rewriteTopology(new StreamsConfig(mkProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "asdf"),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf")
+        ))));
+
         assertThat(builder.buildGlobalStateTopology().storeToChangelogTopic().get(globalStoreName), is(globalTopic));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 884244d..8b463b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -76,8 +76,10 @@ import java.util.UUID;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
 public class StreamThreadStateStoreProviderTest {
 
@@ -206,9 +208,19 @@ public class StreamThreadStateStoreProviderTest {
     @Test
     public void shouldNotFindKeyValueStoresAsTimestampedStore() {
         mockThread(true);
-        final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> tkvStores =
-                provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.timestampedKeyValueStore()));
-        assertEquals(0, tkvStores.size());
+        final InvalidStateStoreException exception = assertThrows(
+            InvalidStateStoreException.class,
+            () -> provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.timestampedKeyValueStore()))
+        );
+        assertThat(
+            exception.getMessage(),
+            is(
+                "Cannot get state store kv-store because the queryable store type " +
+                    "[class org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedKeyValueStoreType] " +
+                    "does not accept the actual store type " +
+                    "[class org.apache.kafka.streams.state.internals.MeteredKeyValueStore]."
+            )
+        );
     }
 
     @Test
@@ -250,9 +262,19 @@ public class StreamThreadStateStoreProviderTest {
     @Test
     public void shouldNotFindWindowStoresAsTimestampedStore() {
         mockThread(true);
-        final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> windowStores =
-            provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.timestampedWindowStore()));
-        assertEquals(0, windowStores.size());
+        final InvalidStateStoreException exception = assertThrows(
+            InvalidStateStoreException.class,
+            () -> provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.timestampedWindowStore()))
+        );
+        assertThat(
+            exception.getMessage(),
+            is(
+                "Cannot get state store window-store because the queryable store type " +
+                    "[class org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedWindowStoreType] " +
+                    "does not accept the actual store type " +
+                    "[class org.apache.kafka.streams.state.internals.MeteredWindowStore]."
+            )
+        );
     }
 
     @Test
@@ -359,15 +381,6 @@ public class StreamThreadStateStoreProviderTest {
         );
     }
 
-    @Test
-    public void shouldReturnEmptyListIfStoreExistsButIsNotOfTypeValueStore() {
-        mockThread(true);
-        assertEquals(
-            Collections.emptyList(),
-            provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.keyValueStore()))
-        );
-    }
-
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowInvalidStoreExceptionIfNotAllStoresAvailable() {
         mockThread(false);