You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/02/10 21:20:45 UTC

[kafka] branch 2.5 updated: KAFKA-9487: Follow-up PR of Kafka-9445 (#8033)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 3e77458  KAFKA-9487: Follow-up PR of Kafka-9445 (#8033)
3e77458 is described below

commit 3e77458cfb0f7a7586f17777f0e775d91e4744e1
Author: Navinder Pal Singh Brar <na...@yahoo.com>
AuthorDate: Mon Feb 10 23:39:27 2020 +0530

    KAFKA-9487: Follow-up PR of Kafka-9445 (#8033)
    
    Follows up on the original PR for KAFKA-9445 to address a final round of feedback
    
    Cherry-pick of d76fa1b22d4b06e1f1a7700272ca963091f13931 from trunk
    
    Reviewers: John Roesler <vv...@apache.org>, Matthias J. Sax <ma...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 14 ++--
 ...eQueryParams.java => StoreQueryParameters.java} | 61 +++++++--------
 .../internals/InternalTopologyBuilder.java         |  4 +
 .../processor/internals/StreamsMetadataState.java  |  5 +-
 .../state/internals/QueryableStoreProvider.java    | 14 ++--
 .../internals/StreamThreadStateStoreProvider.java  | 53 ++++++++-----
 .../state/internals/WrappingStoreProvider.java     | 14 ++--
 .../streams/integration/EosIntegrationTest.java    |  4 +-
 .../GlobalKTableEOSIntegrationTest.java            | 10 +--
 .../integration/GlobalKTableIntegrationTest.java   | 18 ++---
 .../KStreamAggregationIntegrationTest.java         |  4 +-
 .../OptimizedKTableIntegrationTest.java            |  6 +-
 .../integration/QueryableStateIntegrationTest.java | 38 +++++-----
 .../integration/StoreQueryIntegrationTest.java     | 87 +++++++++++-----------
 .../integration/StoreUpgradeIntegrationTest.java   | 22 +++---
 .../StreamStreamJoinIntegrationTest.java           |  4 +-
 .../internals/InternalStreamsBuilderTest.java      |  4 +-
 .../CompositeReadOnlyKeyValueStoreTest.java        |  6 +-
 .../CompositeReadOnlySessionStoreTest.java         |  6 +-
 .../CompositeReadOnlyWindowStoreTest.java          |  6 +-
 .../internals/QueryableStoreProviderTest.java      | 16 ++--
 .../StreamThreadStateStoreProviderTest.java        | 32 ++++----
 .../state/internals/WrappingStoreProviderTest.java |  8 +-
 .../apache/kafka/test/StateStoreProviderStub.java  |  8 +-
 24 files changed, 225 insertions(+), 219 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index fdf38e2..2563718 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1162,27 +1162,25 @@ public class KafkaStreams implements AutoCloseable {
 
 
     /**
-     * @deprecated since 2.5 release; use {@link #store(StoreQueryParams)}  instead
+     * @deprecated since 2.5 release; use {@link #store(StoreQueryParameters)}  instead
      */
     @Deprecated
     public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
-        return store(StoreQueryParams.fromNameAndType(storeName, queryableStoreType));
+        return store(StoreQueryParameters.fromNameAndType(storeName, queryableStoreType));
     }
 
     /**
-     * Get a facade wrapping the local {@link StateStore} instances with the provided {@link StoreQueryParams}.
-     * StoreQueryParams need required parameters to be set, which are {@code storeName} and if
-     * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
+     * Get a facade wrapping the local {@link StateStore} instances with the provided {@link StoreQueryParameters}.
      * The returned object can be used to query the {@link StateStore} instances.
      *
-     * @param storeQueryParams   to set the optional parameters to fetch type of stores user wants to fetch when a key is queried
+     * @param storeQueryParameters   the parameters used to fetch a queryable store
      * @return A facade wrapping the local {@link StateStore} instances
      * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
      * {@code queryableStoreType} doesn't exist
      */
-    public <T> T store(final StoreQueryParams<T> storeQueryParams) {
+    public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
         validateIsRunningOrRebalancing();
-        return queryableStoreProvider.getStore(storeQueryParams);
+        return queryableStoreProvider.getStore(storeQueryParameters);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/StoreQueryParams.java b/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
similarity index 54%
rename from streams/src/main/java/org/apache/kafka/streams/StoreQueryParams.java
rename to streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
index 2a6bbd8..332ac1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StoreQueryParams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
@@ -21,60 +21,51 @@ import org.apache.kafka.streams.state.QueryableStoreType;
 import java.util.Objects;
 
 /**
- * Represents all the query options that a user can provide to state what kind of stores it is expecting.
- * The options would be whether a user would want to enable/disable stale stores
- * or whether it knows the list of partitions that it specifically wants to fetch.
- * If this information is not provided the default behavior is to fetch the stores for all the partitions
- * available on that instance for that particular store name.
- * It contains a partition, which for a point queries can be populated from the {@link KeyQueryMetadata}.
+ * {@code StoreQueryParameters} allows you to pass a variety of parameters when fetching a store for interactive query.
  */
-public class StoreQueryParams<T> {
+public class StoreQueryParameters<T> {
 
     private Integer partition;
     private boolean staleStores;
     private final String storeName;
     private final QueryableStoreType<T> queryableStoreType;
 
-    private StoreQueryParams(final String storeName, final QueryableStoreType<T>  queryableStoreType) {
+    private StoreQueryParameters(final String storeName, final QueryableStoreType<T>  queryableStoreType, final Integer partition, final boolean staleStores) {
         this.storeName = storeName;
         this.queryableStoreType = queryableStoreType;
+        this.partition = partition;
+        this.staleStores = staleStores;
     }
 
-    public static <T> StoreQueryParams<T> fromNameAndType(final String storeName,
+    public static <T> StoreQueryParameters<T> fromNameAndType(final String storeName,
                                                           final QueryableStoreType<T>  queryableStoreType) {
-        return new StoreQueryParams<T>(storeName, queryableStoreType);
+        return new StoreQueryParameters<T>(storeName, queryableStoreType, null, false);
     }
 
     /**
      * Set a specific partition that should be queried exclusively.
      *
-     * @param partition   The specific integer partition to be fetched from the stores list by using {@link StoreQueryParams}.
+     * @param partition   The specific integer partition to be fetched from the stores list by using {@link StoreQueryParameters}.
      *
-     * @return String storeName
+     * @return StoreQueryParameters a new {@code StoreQueryParameters} instance configured with the specified partition
      */
-    public StoreQueryParams<T> withPartition(final Integer partition) {
-        final StoreQueryParams<T> storeQueryParams = StoreQueryParams.fromNameAndType(this.storeName(), this.queryableStoreType());
-        storeQueryParams.partition = partition;
-        storeQueryParams.staleStores = this.staleStores;
-        return storeQueryParams;
+    public StoreQueryParameters<T> withPartition(final Integer partition) {
+        return new StoreQueryParameters<T>(storeName, queryableStoreType, partition, staleStores);
     }
 
     /**
      * Enable querying of stale state stores, i.e., allow to query active tasks during restore as well as standby tasks.
      *
-     * @return String storeName
+     * @return StoreQueryParameters a new {@code StoreQueryParameters} instance configured with serving from stale stores enabled
      */
-    public StoreQueryParams<T> enableStaleStores() {
-        final StoreQueryParams<T> storeQueryParams = StoreQueryParams.fromNameAndType(this.storeName(), this.queryableStoreType());
-        storeQueryParams.partition = this.partition;
-        storeQueryParams.staleStores = true;
-        return storeQueryParams;
+    public StoreQueryParameters<T> enableStaleStores() {
+        return new StoreQueryParameters<T>(storeName, queryableStoreType, partition, true);
     }
 
     /**
-     * Get the store name for which key is queried by the user.
+     * Get the name of the state store that should be queried.
      *
-     * @return String storeName
+     * @return String state store name
      */
     public String storeName() {
         return storeName;
@@ -83,16 +74,16 @@ public class StoreQueryParams<T> {
     /**
      * Get the queryable store type for which key is queried by the user.
      *
-     * @return QueryableStoreType queryableStoreType
+     * @return QueryableStoreType type of queryable store
      */
     public QueryableStoreType<T> queryableStoreType() {
         return queryableStoreType;
     }
 
     /**
-     * Get the partition to be used to fetch list of stores.
+     * Get the store partition that will be queried.
      * If the method returns {@code null}, it would mean that no specific partition has been requested,
-     * so all the local partitions for the store will be returned.
+     * so all the local partitions for the store will be queried.
      *
      * @return Integer partition
      */
@@ -111,19 +102,19 @@ public class StoreQueryParams<T> {
 
     @Override
     public boolean equals(final Object obj) {
-        if (!(obj instanceof StoreQueryParams)) {
+        if (!(obj instanceof StoreQueryParameters)) {
             return false;
         }
-        final StoreQueryParams storeQueryParams = (StoreQueryParams) obj;
-        return Objects.equals(storeQueryParams.partition, partition)
-                && Objects.equals(storeQueryParams.staleStores, staleStores)
-                && Objects.equals(storeQueryParams.storeName, storeName)
-                && Objects.equals(storeQueryParams.queryableStoreType, queryableStoreType);
+        final StoreQueryParameters storeQueryParameters = (StoreQueryParameters) obj;
+        return Objects.equals(storeQueryParameters.partition, partition)
+                && Objects.equals(storeQueryParameters.staleStores, staleStores)
+                && Objects.equals(storeQueryParameters.storeName, storeName)
+                && Objects.equals(storeQueryParameters.queryableStoreType, queryableStoreType);
     }
 
     @Override
     public String toString() {
-        return "StoreQueryParams {" +
+        return "StoreQueryParameters {" +
                 "partition=" + partition +
                 ", staleStores=" + staleStores +
                 ", storeName=" + storeName +
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index eeb6c0d..bdbf9cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1148,6 +1148,10 @@ public class InternalTopologyBuilder {
         return results;
     }
 
+    public Collection<String> sourceTopicsForStore(final String storeName) {
+        return maybeDecorateInternalSourceTopics(stateStoreNameToSourceTopics.get(storeName));
+    }
+
     public synchronized Collection<Set<String>> copartitionGroups() {
         // compute transitive closures of copartitionGroups to relieve registering code to know all members
         // of a copartitionGroup at the same time
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 0de43aa..c746059 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
@@ -109,7 +110,7 @@ public class StreamsMetadataState {
             return allMetadata;
         }
 
-        final List<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName);
+        final Collection<String> sourceTopics = builder.sourceTopicsForStore(storeName);
         if (sourceTopics == null) {
             return Collections.emptyList();
         }
@@ -408,7 +409,7 @@ public class StreamsMetadataState {
     }
 
     private SourceTopicsInfo getSourceTopicsInfo(final String storeName) {
-        final List<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName);
+        final List<String> sourceTopics = builder.sourceTopicsForStore(storeName).stream().collect(Collectors.toList());
         if (sourceTopics == null || sourceTopics.isEmpty()) {
             return null;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 4458b26..8917164 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
@@ -42,29 +42,29 @@ public class QueryableStoreProvider {
      * Get a composite object wrapping the instances of the {@link StateStore} with the provided
      * storeName and {@link QueryableStoreType}
      *
-     * @param storeQueryParams       if stateStoresEnabled is used i.e. staleStoresEnabled is true, include standbys and recovering stores;
+     * @param storeQueryParameters       if stateStoresEnabled is used i.e. staleStoresEnabled is true, include standbys and recovering stores;
      *                                        if stateStoresDisabled i.e. staleStoresEnabled is false, only include running actives;
      *                                        if partition is null then it fetches all local partitions on the instance;
      *                                        if partition is set then it fetches a specific partition.
      * @param <T>                The expected type of the returned store
      * @return A composite object that wraps the store instances.
      */
-    public <T> T getStore(final StoreQueryParams<T> storeQueryParams) {
-        final String storeName = storeQueryParams.storeName();
-        final QueryableStoreType<T> queryableStoreType = storeQueryParams.queryableStoreType();
+    public <T> T getStore(final StoreQueryParameters<T> storeQueryParameters) {
+        final String storeName = storeQueryParameters.storeName();
+        final QueryableStoreType<T> queryableStoreType = storeQueryParameters.queryableStoreType();
         final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
         if (!globalStore.isEmpty()) {
             return queryableStoreType.create(globalStoreProvider, storeName);
         }
         final List<T> allStores = new ArrayList<>();
         for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
-            allStores.addAll(storeProvider.stores(storeQueryParams));
+            allStores.addAll(storeProvider.stores(storeQueryParameters));
         }
         if (allStores.isEmpty()) {
             throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
         }
         return queryableStoreType.create(
-            new WrappingStoreProvider(storeProviders, storeQueryParams),
+            new WrappingStoreProvider(storeProviders, storeQueryParameters),
             storeName
         );
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index eeb852d..e1b8412 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -47,7 +47,7 @@ public class StreamThreadStateStoreProvider {
     }
 
     @SuppressWarnings("unchecked")
-    public <T> List<T> stores(final StoreQueryParams storeQueryParams) {
+    public <T> List<T> stores(final StoreQueryParameters storeQueryParams) {
         final String storeName = storeQueryParams.storeName();
         final QueryableStoreType<T> queryableStoreType = storeQueryParams.queryableStoreType();
         final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.partition());
@@ -58,24 +58,16 @@ public class StreamThreadStateStoreProvider {
         if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) {
             final Map<TaskId, ? extends Task> tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTasks();
             final List<T> stores = new ArrayList<>();
-            for (final Task streamTask : tasks.values()) {
-                if (keyTaskId != null && !keyTaskId.equals(streamTask.id())) {
-                    continue;
+            if (keyTaskId != null) {
+                final T store = validateAndListStores(tasks.get(keyTaskId).getStore(storeName), queryableStoreType, storeName, keyTaskId);
+                if (store != null) {
+                    return Collections.singletonList(store);
                 }
-                final StateStore store = streamTask.getStore(storeName);
-                if (store != null && queryableStoreType.accepts(store)) {
-                    if (!store.isOpen()) {
-                        throw new InvalidStateStoreException(
-                            "Cannot get state store " + storeName + " for task " + streamTask +
-                                " because the store is not open. " +
-                                "The state store may have migrated to another instances.");
-                    }
-                    if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
-                        stores.add((T) new ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store));
-                    } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
-                        stores.add((T) new ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store));
-                    } else {
-                        stores.add((T) store);
+            } else {
+                for (final Task streamTask : tasks.values()) {
+                    final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
+                    if (store != null) {
+                        stores.add(store);
                     }
                 }
             }
@@ -87,6 +79,27 @@ public class StreamThreadStateStoreProvider {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private <T> T validateAndListStores(final StateStore store, final QueryableStoreType<T> queryableStoreType, final String storeName, final TaskId taskId) {
+        if (store != null && queryableStoreType.accepts(store)) {
+            if (!store.isOpen()) {
+                throw new InvalidStateStoreException(
+                    "Cannot get state store " + storeName + " for task " + taskId +
+                        " because the store is not open. " +
+                        "The state store may have migrated to another instances.");
+            }
+            if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
+                return (T) new ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store);
+            } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
+                return (T) new ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store);
+            } else {
+                return (T) store;
+            }
+        } else {
+            return null;
+        }
+    }
+
     private TaskId createKeyTaskId(final String storeName, final Integer partition) {
         if (partition == null) {
             return null;
@@ -100,6 +113,6 @@ public class StreamThreadStateStoreProvider {
             }
         }
         throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " + partition + "is" +
-                                                "not available on this instance");
+                                                 "not available on this instance");
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
index 2caee8b..5c9ae1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.QueryableStoreType;
 
@@ -29,17 +29,17 @@ import java.util.List;
 public class WrappingStoreProvider implements StateStoreProvider {
 
     private final List<StreamThreadStateStoreProvider> storeProviders;
-    private StoreQueryParams storeQueryParams;
+    private StoreQueryParameters storeQueryParameters;
 
     WrappingStoreProvider(final List<StreamThreadStateStoreProvider> storeProviders,
-                          final StoreQueryParams storeQueryParams) {
+                          final StoreQueryParameters storeQueryParameters) {
         this.storeProviders = storeProviders;
-        this.storeQueryParams = storeQueryParams;
+        this.storeQueryParameters = storeQueryParameters;
     }
 
     //visible for testing
-    public void setStoreQueryParams(final StoreQueryParams storeQueryParams) {
-        this.storeQueryParams = storeQueryParams;
+    public void setStoreQueryParameters(final StoreQueryParameters storeQueryParameters) {
+        this.storeQueryParameters = storeQueryParameters;
     }
 
     @Override
@@ -47,7 +47,7 @@ public class WrappingStoreProvider implements StateStoreProvider {
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
         for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParams);
+            final List<T> stores = provider.stores(storeQueryParameters);
             allStores.addAll(stores);
         }
         if (allStores.isEmpty()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index ad7ada9..4ff847f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -751,7 +751,7 @@ public class EosIntegrationTest {
         final long maxWaitingTime = System.currentTimeMillis() + 300000L;
         while (System.currentTimeMillis() < maxWaitingTime) {
             try {
-                store = streams.store(StoreQueryParams.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
+                store = streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
                 break;
             } catch (final InvalidStateStoreException okJustRetry) {
                 try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index d770f01..6dec94b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -139,7 +139,7 @@ public class GlobalKTableEOSIntegrationTest {
         produceGlobalTableValues();
 
         final ReadOnlyKeyValueStore<Long, String> replicatedStore =
-            kafkaStreams.store(StoreQueryParams.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
+            kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
 
         TestUtils.waitForCondition(
             () -> "J".equals(replicatedStore.get(5L)),
@@ -183,7 +183,7 @@ public class GlobalKTableEOSIntegrationTest {
         produceGlobalTableValues();
 
         final ReadOnlyKeyValueStore<Long, String> replicatedStore =
-            kafkaStreams.store(StoreQueryParams.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
+            kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
 
         TestUtils.waitForCondition(
             () -> "J".equals(replicatedStore.get(5L)),
@@ -220,7 +220,7 @@ public class GlobalKTableEOSIntegrationTest {
             () -> {
                 final ReadOnlyKeyValueStore<Long, String> store;
                 try {
-                    store = kafkaStreams.store(StoreQueryParams.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
+                    store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
                 } catch (final InvalidStateStoreException ex) {
                     return false;
                 }
@@ -254,7 +254,7 @@ public class GlobalKTableEOSIntegrationTest {
             () -> {
                 final ReadOnlyKeyValueStore<Long, String> store;
                 try {
-                    store = kafkaStreams.store(StoreQueryParams.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
+                    store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
                 } catch (final InvalidStateStoreException ex) {
                     return false;
                 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 638134d..d9d6e9c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -142,7 +142,7 @@ public class GlobalKTableIntegrationTest {
         produceGlobalTableValues();
 
         final ReadOnlyKeyValueStore<Long, String> replicatedStore =
-            kafkaStreams.store(StoreQueryParams.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
+            kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
 
         TestUtils.waitForCondition(
             () -> "J".equals(replicatedStore.get(5L)),
@@ -150,7 +150,7 @@ public class GlobalKTableIntegrationTest {
             "waiting for data in replicated store");
 
         final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> replicatedStoreWithTimestamp =
-            kafkaStreams.store(StoreQueryParams.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore()));
+            kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore()));
         assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L)));
 
         firstTimestamp = mockTime.milliseconds();
@@ -209,7 +209,7 @@ public class GlobalKTableIntegrationTest {
         produceGlobalTableValues();
 
         final ReadOnlyKeyValueStore<Long, String> replicatedStore =
-            kafkaStreams.store(StoreQueryParams.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
+            kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
 
         TestUtils.waitForCondition(
             () -> "J".equals(replicatedStore.get(5L)),
@@ -217,7 +217,7 @@ public class GlobalKTableIntegrationTest {
             "waiting for data in replicated store");
 
         final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> replicatedStoreWithTimestamp =
-            kafkaStreams.store(StoreQueryParams.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore()));
+            kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore()));
         assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L)));
 
         firstTimestamp = mockTime.milliseconds();
@@ -254,17 +254,17 @@ public class GlobalKTableIntegrationTest {
         produceInitialGlobalTableValues();
 
         startStreams();
-        ReadOnlyKeyValueStore<Long, String> store = kafkaStreams.store(StoreQueryParams.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
+        ReadOnlyKeyValueStore<Long, String> store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
         assertThat(store.approximateNumEntries(), equalTo(4L));
         ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> timestampedStore =
-            kafkaStreams.store(StoreQueryParams.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore()));
+            kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore()));
         assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
         kafkaStreams.close();
 
         startStreams();
-        store = kafkaStreams.store(StoreQueryParams.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
+        store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
         assertThat(store.approximateNumEntries(), equalTo(4L));
-        timestampedStore = kafkaStreams.store(StoreQueryParams.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore()));
+        timestampedStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore()));
         assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index a23e59a..3aeaf4d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.KeyValueTimestamp;
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -674,7 +674,7 @@ public class KStreamAggregationIntegrationTest {
 
         // verify can query data via IQ
         final ReadOnlySessionStore<String, String> sessionStore =
-            kafkaStreams.store(StoreQueryParams.fromNameAndType(userSessionsStore, QueryableStoreTypes.sessionStore()));
+            kafkaStreams.store(StoreQueryParameters.fromNameAndType(userSessionsStore, QueryableStoreTypes.sessionStore()));
         final KeyValueIterator<Windowed<String>, String> bob = sessionStore.fetch("bob");
         assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1, t1)), "start")));
         assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t3, t4)), "pause:resume")));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index acdf801..31dcc61 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -46,7 +46,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.KeyQueryMetadata;
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -153,10 +153,10 @@ public class OptimizedKTableIntegrationTest {
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
 
         final ReadOnlyKeyValueStore<Integer, Integer> store1 = kafkaStreams1
-            .store(StoreQueryParams.fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()));
+            .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()));
 
         final ReadOnlyKeyValueStore<Integer, Integer> store2 = kafkaStreams2
-            .store(StoreQueryParams.fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()));
+            .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()));
 
         final boolean kafkaStreams1WasFirstActive;
         final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index f7dc302..b9f131d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -29,13 +29,13 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KafkaStreams.State;
-import org.apache.kafka.streams.KafkaStreamsTest;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.KeyQueryMetadata;
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.KafkaStreamsTest;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyQueryMetadata;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.LagInfo;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -307,7 +307,7 @@ public class QueryableStateIntegrationTest {
                     final KafkaStreams streamsWithKey = pickInstanceByPort ? streamsList.get(index) : streams;
                     final QueryableStoreType<ReadOnlyKeyValueStore<String, Long>> queryableStoreType = QueryableStoreTypes.keyValueStore();
                     final ReadOnlyKeyValueStore<String, Long> store =
-                        streamsWithKey.store(StoreQueryParams.fromNameAndType(storeName, queryableStoreType).enableStaleStores());
+                        streamsWithKey.store(StoreQueryParameters.fromNameAndType(storeName, queryableStoreType).enableStaleStores());
                     if (store == null) {
                         nullStoreKeys.add(key);
                         continue;
@@ -367,7 +367,7 @@ public class QueryableStateIntegrationTest {
                     final KafkaStreams streamsWithKey = pickInstanceByPort ? streamsList.get(index) : streams;
                     final QueryableStoreType<ReadOnlyWindowStore<String, Long>> queryableWindowStoreType = QueryableStoreTypes.windowStore();
                     final ReadOnlyWindowStore<String, Long> store =
-                        streamsWithKey.store(StoreQueryParams.fromNameAndType(storeName, queryableWindowStoreType).enableStaleStores());
+                        streamsWithKey.store(StoreQueryParameters.fromNameAndType(storeName, queryableWindowStoreType).enableStaleStores());
                     if (store == null) {
                         nullStoreKeys.add(key);
                         continue;
@@ -650,10 +650,10 @@ public class QueryableStateIntegrationTest {
             waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration);
 
             final ReadOnlyKeyValueStore<String, Long> keyValueStore =
-                kafkaStreams.store(StoreQueryParams.fromNameAndType(storeName + "-" + streamConcurrent, QueryableStoreTypes.keyValueStore()));
+                kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName + "-" + streamConcurrent, QueryableStoreTypes.keyValueStore()));
 
             final ReadOnlyWindowStore<String, Long> windowStore =
-                kafkaStreams.store(StoreQueryParams.fromNameAndType(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.windowStore()));
+                kafkaStreams.store(StoreQueryParameters.fromNameAndType(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.windowStore()));
 
             final Map<String, Long> expectedWindowState = new HashMap<>();
             final Map<String, Long> expectedCount = new HashMap<>();
@@ -716,9 +716,9 @@ public class QueryableStateIntegrationTest {
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
         final ReadOnlyKeyValueStore<String, Long>
-            myFilterStore = kafkaStreams.store(StoreQueryParams.fromNameAndType("queryFilter", QueryableStoreTypes.keyValueStore()));
+            myFilterStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType("queryFilter", QueryableStoreTypes.keyValueStore()));
         final ReadOnlyKeyValueStore<String, Long>
-            myFilterNotStore = kafkaStreams.store(StoreQueryParams.fromNameAndType("queryFilterNot", QueryableStoreTypes.keyValueStore()));
+            myFilterNotStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType("queryFilterNot", QueryableStoreTypes.keyValueStore()));
 
         for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
             TestUtils.waitForCondition(() -> expectedEntry.value.equals(myFilterStore.get(expectedEntry.key)),
@@ -782,7 +782,7 @@ public class QueryableStateIntegrationTest {
         waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
 
         final ReadOnlyKeyValueStore<String, Long> myMapStore =
-            kafkaStreams.store(StoreQueryParams.fromNameAndType("queryMapValues", QueryableStoreTypes.keyValueStore()));
+            kafkaStreams.store(StoreQueryParameters.fromNameAndType("queryMapValues", QueryableStoreTypes.keyValueStore()));
         for (final KeyValue<String, String> batchEntry : batch1) {
             assertEquals(Long.valueOf(batchEntry.value), myMapStore.get(batchEntry.key));
         }
@@ -830,7 +830,7 @@ public class QueryableStateIntegrationTest {
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
         final ReadOnlyKeyValueStore<String, Long>
-            myMapStore = kafkaStreams.store(StoreQueryParams.fromNameAndType("queryMapValues",
+            myMapStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType("queryMapValues",
             QueryableStoreTypes.keyValueStore()));
         for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
             assertEquals(expectedEntry.value, myMapStore.get(expectedEntry.key));
@@ -891,10 +891,10 @@ public class QueryableStateIntegrationTest {
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
         final ReadOnlyKeyValueStore<String, Long>
-            myCount = kafkaStreams.store(StoreQueryParams.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
+            myCount = kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
 
         final ReadOnlyWindowStore<String, Long> windowStore =
-            kafkaStreams.store(StoreQueryParams.fromNameAndType(windowStoreName, QueryableStoreTypes.windowStore()));
+            kafkaStreams.store(StoreQueryParameters.fromNameAndType(windowStoreName, QueryableStoreTypes.windowStore()));
         verifyCanGetByKey(keys,
             expectedCount,
             expectedCount,
@@ -933,7 +933,7 @@ public class QueryableStateIntegrationTest {
             "waiting for store " + storeName);
 
         final ReadOnlyKeyValueStore<String, Long> store =
-            kafkaStreams.store(StoreQueryParams.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
+            kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
 
         TestUtils.waitForCondition(
             () -> Long.valueOf(8).equals(store.get("hello")),
@@ -953,7 +953,7 @@ public class QueryableStateIntegrationTest {
                 try {
                     assertEquals(
                         Long.valueOf(8L),
-                        kafkaStreams.store(StoreQueryParams.fromNameAndType(storeName, QueryableStoreTypes.<String, Long>keyValueStore())).get("hello"));
+                        kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.<String, Long>keyValueStore())).get("hello"));
                     return true;
                 } catch (final InvalidStateStoreException ise) {
                     return false;
@@ -974,7 +974,7 @@ public class QueryableStateIntegrationTest {
         @Override
         public boolean conditionMet() {
             try {
-                kafkaStreams.store(StoreQueryParams.fromNameAndType(storeName, QueryableStoreTypes.<String, Long>keyValueStore()));
+                kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.<String, Long>keyValueStore()));
                 return true;
             } catch (final InvalidStateStoreException ise) {
                 return false;
@@ -1032,7 +1032,7 @@ public class QueryableStateIntegrationTest {
             "waiting for store " + storeName);
 
         final ReadOnlyKeyValueStore<String, String> store =
-            kafkaStreams.store(StoreQueryParams.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
+            kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
 
         TestUtils.waitForCondition(
             () -> "12".equals(store.get("a")) && "34".equals(store.get("b")),
@@ -1059,7 +1059,7 @@ public class QueryableStateIntegrationTest {
             "waiting for store " + storeName);
 
         final ReadOnlyKeyValueStore<String, String> store2 =
-            kafkaStreams.store(StoreQueryParams.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
+            kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
 
         try {
             TestUtils.waitForCondition(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index c03998f..ec820f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -24,11 +24,10 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyQueryMetadata;
-import org.apache.kafka.streams.StoreQueryParams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -91,7 +90,7 @@ public class StoreQueryIntegrationTest {
     }
 
     @Test
-    public void shouldQueryAllActivePartitionStoresByDefault() throws Exception {
+    public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
         final int batch1NumMessages = 100;
         final int key = 1;
         final Semaphore semaphore = new Semaphore(0);
@@ -117,10 +116,10 @@ public class StoreQueryIntegrationTest {
 
         final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = QueryableStoreTypes.keyValueStore();
         final ReadOnlyKeyValueStore<Integer, Integer> store1 = kafkaStreams1
-                .store(StoreQueryParams.fromNameAndType(TABLE_NAME, queryableStoreType));
+                .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType));
 
         final ReadOnlyKeyValueStore<Integer, Integer> store2 = kafkaStreams2
-                .store(StoreQueryParams.fromNameAndType(TABLE_NAME, queryableStoreType));
+                .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType));
 
         final boolean kafkaStreams1IsActive;
         if ((keyQueryMetadata.getActiveHost().port() % 2) == 1) {
@@ -164,28 +163,28 @@ public class StoreQueryIntegrationTest {
 
         //key doesn't belongs to this partition
         final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+        final boolean kafkaStreams1IsActive;
+        if ((keyQueryMetadata.getActiveHost().port() % 2) == 1) {
+            kafkaStreams1IsActive = true;
+        } else {
+            kafkaStreams1IsActive = false;
+        }
+
         final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = QueryableStoreTypes.keyValueStore();
         ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
         ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
-        try {
+        if (kafkaStreams1IsActive) {
             store1 = kafkaStreams1
-                    .store(StoreQueryParams.fromNameAndType(TABLE_NAME, queryableStoreType).withPartition(keyPartition));
-        } catch (final InvalidStateStoreException exception) {
-        //Only one among kafkaStreams1 and kafkaStreams2 will contain the specific active store requested. The other will throw exception
-        }
-        try {
+                    .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).withPartition(keyPartition));
+        } else {
             store2 = kafkaStreams2
-                    .store(StoreQueryParams.fromNameAndType(TABLE_NAME, queryableStoreType).withPartition(keyPartition));
-        } catch (final InvalidStateStoreException exception) {
-            //Only one among kafkaStreams1 and kafkaStreams2 will contain the specific active store requested. The other will throw exception
+                    .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).withPartition(keyPartition));
         }
-        final boolean kafkaStreams1IsActive;
-        if ((keyQueryMetadata.getActiveHost().port() % 2) == 1) {
-            kafkaStreams1IsActive = true;
+
+        if (kafkaStreams1IsActive) {
             assertThat(store1, is(notNullValue()));
             assertThat(store2, is(nullValue()));
         } else {
-            kafkaStreams1IsActive = false;
             assertThat(store2, is(notNullValue()));
             assertThat(store1, is(nullValue()));
         }
@@ -195,17 +194,12 @@ public class StoreQueryIntegrationTest {
 
         ReadOnlyKeyValueStore<Integer, Integer> store3 = null;
         ReadOnlyKeyValueStore<Integer, Integer> store4 = null;
-        try {
+        if (!kafkaStreams1IsActive) {
             store3 = kafkaStreams1
-                    .store(StoreQueryParams.fromNameAndType(TABLE_NAME, queryableStoreType).withPartition(keyDontBelongPartition));
-        } catch (final InvalidStateStoreException exception) {
-            //Only one among kafkaStreams1 and kafkaStreams2 will contain the specific active store requested. The other will throw exception
-        }
-        try {
+                    .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).withPartition(keyDontBelongPartition));
+        } else {
             store4 = kafkaStreams2
-                    .store(StoreQueryParams.fromNameAndType(TABLE_NAME, queryableStoreType).withPartition(keyDontBelongPartition));
-        } catch (final InvalidStateStoreException exception) {
-            //Only one among kafkaStreams1 and kafkaStreams2 will contain the specific active store requested. The other will throw exception
+                    .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).withPartition(keyDontBelongPartition));
         }
 
         // Assert that key is not served when wrong specific partition is requested
@@ -239,15 +233,18 @@ public class StoreQueryIntegrationTest {
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
 
         final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = QueryableStoreTypes.keyValueStore();
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = kafkaStreams1
-                .store(StoreQueryParams.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores());
-
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = kafkaStreams2
-                .store(StoreQueryParams.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores());
 
         // Assert that both active and standby are able to query for a key
-        assertThat(store1.get(key), is(notNullValue()));
-        assertThat(store2.get(key), is(notNullValue()));
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = kafkaStreams1
+                .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores());
+            return store1.get(key) != null;
+        }, "store1 cannot find results for key");
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = kafkaStreams2
+                .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores());
+            return store2.get(key) != null;
+        }, "store2 cannot find results for key");
     }
 
     @Test
@@ -281,22 +278,24 @@ public class StoreQueryIntegrationTest {
         //key doesn't belongs to this partition
         final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
         final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = QueryableStoreTypes.keyValueStore();
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = kafkaStreams1
-                .store(StoreQueryParams.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores().withPartition(keyPartition));
-
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = kafkaStreams2
-                .store(StoreQueryParams.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores().withPartition(keyPartition));
 
         // Assert that both active and standby are able to query for a key
-        assertThat(store1.get(key), is(notNullValue()));
-        assertThat(store2.get(key), is(notNullValue()));
-
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = kafkaStreams1
+                .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores().withPartition(keyPartition));
+            return store1.get(key) != null;
+        }, "store1 cannot find results for key");
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = kafkaStreams2
+                .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores().withPartition(keyPartition));
+            return store2.get(key) != null;
+        }, "store2 cannot find results for key");
 
         final ReadOnlyKeyValueStore<Integer, Integer> store3 = kafkaStreams1
-                .store(StoreQueryParams.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores().withPartition(keyDontBelongPartition));
+                .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores().withPartition(keyDontBelongPartition));
 
         final ReadOnlyKeyValueStore<Integer, Integer> store4 = kafkaStreams2
-                .store(StoreQueryParams.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores().withPartition(keyDontBelongPartition));
+                .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores().withPartition(keyDontBelongPartition));
 
         // Assert that
         assertThat(store3.get(key), is(nullValue()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index 3fa5dbc..02a0b57 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -20,10 +20,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -334,7 +334,7 @@ public class StoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyKeyValueStore<K, V> store =
-                        kafkaStreams.store(StoreQueryParams.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+                        kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
                     try (final KeyValueIterator<K, V> all = store.all()) {
                         final List<KeyValue<K, V>> storeContent = new LinkedList<>();
                         while (all.hasNext()) {
@@ -359,7 +359,7 @@ public class StoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
-                        kafkaStreams.store(StoreQueryParams.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore()));
+                        kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore()));
                     final ValueAndTimestamp<Long> count = store.get(key);
                     return count.value() == value && count.timestamp() == timestamp;
                 } catch (final Exception swallow) {
@@ -378,7 +378,7 @@ public class StoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
-                        kafkaStreams.store(StoreQueryParams.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore()));
+                        kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore()));
                     final ValueAndTimestamp<Long> count = store.get(key);
                     return count.value() == value && count.timestamp() == -1L;
                 } catch (final Exception swallow) {
@@ -408,7 +408,7 @@ public class StoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
-                        kafkaStreams.store(StoreQueryParams.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore()));
+                        kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore()));
                     try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
                         final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
                         while (all.hasNext()) {
@@ -443,7 +443,7 @@ public class StoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
-                        kafkaStreams.store(StoreQueryParams.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore()));
+                        kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore()));
                     try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
                         final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
                         while (all.hasNext()) {
@@ -809,7 +809,7 @@ public class StoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyWindowStore<K, V> store =
-                        kafkaStreams.store(StoreQueryParams.fromNameAndType(STORE_NAME, QueryableStoreTypes.windowStore()));
+                        kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.windowStore()));
                     try (final KeyValueIterator<Windowed<K>, V> all = store.all()) {
                         final List<KeyValue<Windowed<K>, V>> storeContent = new LinkedList<>();
                         while (all.hasNext()) {
@@ -833,7 +833,7 @@ public class StoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
-                        kafkaStreams.store(StoreQueryParams.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedWindowStore()));
+                        kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedWindowStore()));
                     final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
                     return count.value() == value && count.timestamp() == -1L;
                 } catch (final Exception swallow) {
@@ -853,7 +853,7 @@ public class StoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
-                        kafkaStreams.store(StoreQueryParams.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedWindowStore()));
+                        kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedWindowStore()));
                     final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
                     return count.value() == value && count.timestamp() == timestamp;
                 } catch (final Exception swallow) {
@@ -883,7 +883,7 @@ public class StoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyWindowStore<K, ValueAndTimestamp<V>> store =
-                        kafkaStreams.store(StoreQueryParams.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedWindowStore()));
+                        kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedWindowStore()));
                     try (final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> all = store.all()) {
                         final List<KeyValue<Windowed<K>, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
                         while (all.hasNext()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index f69a7b0..dcb5ae3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.integration;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreams.State;
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -94,7 +94,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
 
             kafkaStreams.start();
             latch.await();
-            assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParams.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore())));
+            assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore())));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index d06b028..2e66add 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -257,8 +257,8 @@ public class InternalStreamsBuilderTest {
         mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count(Materialized.as("count"));
         builder.buildAndOptimizeTopology();
         builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)));
-        assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
-        assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("count"));
+        assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.sourceTopicsForStore("table-store"));
+        assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.internalTopologyBuilder.sourceTopicsForStore("count"));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index b16f3bb..67b876a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -66,7 +66,7 @@ public class CompositeReadOnlyKeyValueStoreTest {
         stubProviderOne.addStore("other-store", otherUnderlyingStore);
         final QueryableStoreType<ReadOnlyKeyValueStore<Object, Object>> queryableStoreType = QueryableStoreTypes.keyValueStore();
         theStore = new CompositeReadOnlyKeyValueStore<>(
-            new WrappingStoreProvider(asList(stubProviderOne, stubProviderTwo), StoreQueryParams.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())),
+            new WrappingStoreProvider(asList(stubProviderOne, stubProviderTwo), StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())),
             QueryableStoreTypes.keyValueStore(),
             storeName
         );
@@ -299,7 +299,7 @@ public class CompositeReadOnlyKeyValueStoreTest {
     private CompositeReadOnlyKeyValueStore<Object, Object> rebalancing() {
         final QueryableStoreType<ReadOnlyKeyValueStore<Object, Object>> queryableStoreType = QueryableStoreTypes.keyValueStore();
         return new CompositeReadOnlyKeyValueStore<>(
-            new WrappingStoreProvider(singletonList(new StateStoreProviderStub(true)), StoreQueryParams.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())),
+            new WrappingStoreProvider(singletonList(new StateStoreProviderStub(true)), StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())),
             QueryableStoreTypes.keyValueStore(),
             storeName
         );
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
index 5e309e7..e1957f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
@@ -58,7 +58,7 @@ public class CompositeReadOnlySessionStoreTest {
         final QueryableStoreType<ReadOnlySessionStore<Object, Object>> queryableStoreType = QueryableStoreTypes.sessionStore();
 
         sessionStore = new CompositeReadOnlySessionStore<>(
-            new WrappingStoreProvider(Arrays.asList(stubProviderOne, stubProviderTwo), StoreQueryParams.fromNameAndType(storeName, queryableStoreType)),
+            new WrappingStoreProvider(Arrays.asList(stubProviderOne, stubProviderTwo), StoreQueryParameters.fromNameAndType(storeName, queryableStoreType)),
             QueryableStoreTypes.sessionStore(), storeName);
     }
 
@@ -113,7 +113,7 @@ public class CompositeReadOnlySessionStoreTest {
         final QueryableStoreType<ReadOnlySessionStore<Object, Object>> queryableStoreType = QueryableStoreTypes.sessionStore();
         final CompositeReadOnlySessionStore<String, String> store =
             new CompositeReadOnlySessionStore<>(
-                new WrappingStoreProvider(singletonList(new StateStoreProviderStub(true)), StoreQueryParams.fromNameAndType("whateva", queryableStoreType)),
+                new WrappingStoreProvider(singletonList(new StateStoreProviderStub(true)), StoreQueryParameters.fromNameAndType("whateva", queryableStoreType)),
                 QueryableStoreTypes.sessionStore(),
                 "whateva"
             );
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 8aeb3a0..514d094 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
@@ -68,7 +68,7 @@ public class CompositeReadOnlyWindowStoreTest {
         stubProviderOne.addStore("other-window-store", otherUnderlyingStore);
 
         windowStore = new CompositeReadOnlyWindowStore<>(
-            new WrappingStoreProvider(asList(stubProviderOne, stubProviderTwo), StoreQueryParams.fromNameAndType(storeName, QueryableStoreTypes.windowStore())),
+            new WrappingStoreProvider(asList(stubProviderOne, stubProviderTwo), StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.windowStore())),
                 QueryableStoreTypes.windowStore(),
                 storeName
         );
@@ -140,7 +140,7 @@ public class CompositeReadOnlyWindowStoreTest {
         underlyingWindowStore.setOpen(false);
         final CompositeReadOnlyWindowStore<Object, Object> store =
                 new CompositeReadOnlyWindowStore<>(
-                    new WrappingStoreProvider(singletonList(stubProviderOne), StoreQueryParams.fromNameAndType("window-store", QueryableStoreTypes.windowStore())),
+                    new WrappingStoreProvider(singletonList(stubProviderOne), StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.windowStore())),
                     QueryableStoreTypes.windowStore(),
                     "window-store"
                 );
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
index 0da0714..19a0355 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.NoOpWindowStore;
@@ -54,38 +54,38 @@ public class QueryableStoreProviderTest {
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowExceptionIfKVStoreDoesntExist() {
-        storeProvider.getStore(StoreQueryParams.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore()));
+        storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore()));
     }
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowExceptionIfWindowStoreDoesntExist() {
-        storeProvider.getStore(StoreQueryParams.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore()));
+        storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore()));
     }
 
     @Test
     public void shouldReturnKVStoreWhenItExists() {
-        assertNotNull(storeProvider.getStore(StoreQueryParams.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore())));
+        assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore())));
     }
 
     @Test
     public void shouldReturnWindowStoreWhenItExists() {
-        assertNotNull(storeProvider.getStore(StoreQueryParams.fromNameAndType(windowStore, QueryableStoreTypes.windowStore())));
+        assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.windowStore())));
     }
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowExceptionWhenLookingForWindowStoreWithDifferentType() {
-        storeProvider.getStore(StoreQueryParams.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore()));
+        storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore()));
     }
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowExceptionWhenLookingForKVStoreWithDifferentType() {
-        storeProvider.getStore(StoreQueryParams.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore()));
+        storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore()));
     }
 
     @Test
     public void shouldFindGlobalStores() {
         globalStateStores.put("global", new NoOpReadOnlyStore<>());
-        assertNotNull(storeProvider.getStore(StoreQueryParams.fromNameAndType("global", QueryableStoreTypes.keyValueStore())));
+        assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType("global", QueryableStoreTypes.keyValueStore())));
     }
 
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index b2f5f4e..62de9e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -163,7 +163,7 @@ public class StreamThreadStateStoreProviderTest {
     public void shouldFindKeyValueStores() {
         mockThread(true);
         final List<ReadOnlyKeyValueStore<String, String>> kvStores =
-            provider.stores(StoreQueryParams.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()));
+            provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()));
         assertEquals(2, kvStores.size());
         for (final ReadOnlyKeyValueStore<String, String> store: kvStores) {
             assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
@@ -175,7 +175,7 @@ public class StreamThreadStateStoreProviderTest {
     public void shouldFindTimestampedKeyValueStores() {
         mockThread(true);
         final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> tkvStores =
-            provider.stores(StoreQueryParams.fromNameAndType("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore()));
+            provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore()));
         assertEquals(2, tkvStores.size());
         for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> store: tkvStores) {
             assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
@@ -187,7 +187,7 @@ public class StreamThreadStateStoreProviderTest {
     public void shouldNotFindKeyValueStoresAsTimestampedStore() {
         mockThread(true);
         final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> tkvStores =
-                provider.stores(StoreQueryParams.fromNameAndType("kv-store", QueryableStoreTypes.timestampedKeyValueStore()));
+                provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.timestampedKeyValueStore()));
         assertEquals(0, tkvStores.size());
     }
 
@@ -195,7 +195,7 @@ public class StreamThreadStateStoreProviderTest {
     public void shouldFindTimestampedKeyValueStoresAsKeyValueStores() {
         mockThread(true);
         final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> tkvStores =
-                provider.stores(StoreQueryParams.fromNameAndType("timestamped-kv-store", QueryableStoreTypes.keyValueStore()));
+                provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store", QueryableStoreTypes.keyValueStore()));
         assertEquals(2, tkvStores.size());
         for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> store: tkvStores) {
             assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
@@ -207,7 +207,7 @@ public class StreamThreadStateStoreProviderTest {
     public void shouldFindWindowStores() {
         mockThread(true);
         final List<ReadOnlyWindowStore<String, String>> windowStores =
-            provider.stores(StoreQueryParams.fromNameAndType("window-store", QueryableStoreTypes.windowStore()));
+            provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.windowStore()));
         assertEquals(2, windowStores.size());
         for (final ReadOnlyWindowStore<String, String> store: windowStores) {
             assertThat(store, instanceOf(ReadOnlyWindowStore.class));
@@ -219,7 +219,7 @@ public class StreamThreadStateStoreProviderTest {
     public void shouldFindTimestampedWindowStores() {
         mockThread(true);
         final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> windowStores =
-            provider.stores(StoreQueryParams.fromNameAndType("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore()));
+            provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore()));
         assertEquals(2, windowStores.size());
         for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> store: windowStores) {
             assertThat(store, instanceOf(ReadOnlyWindowStore.class));
@@ -231,7 +231,7 @@ public class StreamThreadStateStoreProviderTest {
     public void shouldNotFindWindowStoresAsTimestampedStore() {
         mockThread(true);
         final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> windowStores =
-            provider.stores(StoreQueryParams.fromNameAndType("window-store", QueryableStoreTypes.timestampedWindowStore()));
+            provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.timestampedWindowStore()));
         assertEquals(0, windowStores.size());
     }
 
@@ -239,7 +239,7 @@ public class StreamThreadStateStoreProviderTest {
     public void shouldFindTimestampedWindowStoresAsWindowStore() {
         mockThread(true);
         final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> windowStores =
-            provider.stores(StoreQueryParams.fromNameAndType("timestamped-window-store", QueryableStoreTypes.windowStore()));
+            provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store", QueryableStoreTypes.windowStore()));
         assertEquals(2, windowStores.size());
         for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> store: windowStores) {
             assertThat(store, instanceOf(ReadOnlyWindowStore.class));
@@ -251,28 +251,28 @@ public class StreamThreadStateStoreProviderTest {
     public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() {
         mockThread(true);
         taskOne.getStore("kv-store").close();
-        provider.stores(StoreQueryParams.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()));
+        provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()));
     }
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowInvalidStoreExceptionIfTsKVStoreClosed() {
         mockThread(true);
         taskOne.getStore("timestamped-kv-store").close();
-        provider.stores(StoreQueryParams.fromNameAndType("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore()));
+        provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore()));
     }
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowInvalidStoreExceptionIfWindowStoreClosed() {
         mockThread(true);
         taskOne.getStore("window-store").close();
-        provider.stores(StoreQueryParams.fromNameAndType("window-store", QueryableStoreTypes.windowStore()));
+        provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.windowStore()));
     }
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowInvalidStoreExceptionIfTsWindowStoreClosed() {
         mockThread(true);
         taskOne.getStore("timestamped-window-store").close();
-        provider.stores(StoreQueryParams.fromNameAndType("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore()));
+        provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore()));
     }
 
     @Test
@@ -280,7 +280,7 @@ public class StreamThreadStateStoreProviderTest {
         mockThread(true);
         assertEquals(
             Collections.emptyList(),
-            provider.stores(StoreQueryParams.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore())));
+            provider.stores(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore())));
     }
 
     @Test
@@ -288,14 +288,14 @@ public class StreamThreadStateStoreProviderTest {
         mockThread(true);
         assertEquals(
             Collections.emptyList(),
-            provider.stores(StoreQueryParams.fromNameAndType("window-store", QueryableStoreTypes.keyValueStore()))
+            provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.keyValueStore()))
         );
     }
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowInvalidStoreExceptionIfNotAllStoresAvailable() {
         mockThread(false);
-        provider.stores(StoreQueryParams.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()));
+        provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()));
     }
 
     private StreamTask createStreamsTask(final StreamsConfig streamsConfig,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
index 7c66509..ceb3f79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.NoOpWindowStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
@@ -57,7 +57,7 @@ public class WrappingStoreProviderTest {
         stubProviderTwo.addStore("window", new NoOpWindowStore());
         wrappingStoreProvider = new WrappingStoreProvider(
             Arrays.asList(stubProviderOne, stubProviderTwo),
-            StoreQueryParams.fromNameAndType("kv", QueryableStoreTypes.keyValueStore())
+            StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore())
         );
     }
 
@@ -70,7 +70,7 @@ public class WrappingStoreProviderTest {
 
     @Test
     public void shouldFindWindowStores() {
-        wrappingStoreProvider.setStoreQueryParams(StoreQueryParams.fromNameAndType("window", windowStore()));
+        wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("window", windowStore()));
         final List<ReadOnlyWindowStore<Object, Object>>
                 windowStores =
                 wrappingStoreProvider.stores("window", windowStore());
@@ -79,7 +79,7 @@ public class WrappingStoreProviderTest {
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowInvalidStoreExceptionIfNoStoreOfTypeFound() {
-        wrappingStoreProvider.setStoreQueryParams(StoreQueryParams.fromNameAndType("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore()));
+        wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore()));
         wrappingStoreProvider.stores("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore());
     }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
index 28835d2..13a29e1 100644
--- a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.test;
 
-import org.apache.kafka.streams.StoreQueryParams;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
@@ -39,9 +39,9 @@ public class StateStoreProviderStub extends StreamThreadStateStoreProvider {
 
     @SuppressWarnings("unchecked")
     @Override
-    public <T> List<T> stores(final StoreQueryParams storeQueryParams) {
-        final String storeName = storeQueryParams.storeName();
-        final QueryableStoreType<T> queryableStoreType = storeQueryParams.queryableStoreType();
+    public <T> List<T> stores(final StoreQueryParameters storeQueryParameters) {
+        final String storeName = storeQueryParameters.storeName();
+        final QueryableStoreType<T> queryableStoreType = storeQueryParameters.queryableStoreType();
         if (throwException) {
             throw new InvalidStateStoreException("store is unavailable");
         }