You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/11/24 05:10:39 UTC

[kafka] 01/03: POC of the KIP-796 IQv2 proposal

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-framework
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 3e308374fc1289c69e28acc0a8a96fa35c3d56fd
Author: John Roesler <vv...@apache.org>
AuthorDate: Thu Oct 14 12:53:11 2021 -0500

    POC of the KIP-796 IQv2 proposal
    
    fix test
    
    Prototype of using KeyQuery against the Windowed store
    
    updates
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 125 +++++-
 .../apache/kafka/streams/processor/StateStore.java |  25 ++
 .../kafka/streams/processor/StateStoreContext.java |   4 +-
 .../apache/kafka/streams/query/FailureReason.java  |  44 ++
 .../streams/query/InteractiveQueryRequest.java     | 157 +++++++
 .../streams/query/InteractiveQueryResult.java      |  72 ++++
 .../org/apache/kafka/streams/query/Iterators.java  |  82 ++++
 .../KeyQuery.java}                                 |  26 +-
 .../org/apache/kafka/streams/query/Position.java   | 192 +++++++++
 .../apache/kafka/streams/query/PositionBound.java  |  80 ++++
 .../Query.java}                                    |  16 +-
 .../apache/kafka/streams/query/QueryResult.java    | 150 +++++++
 .../RawKeyQuery.java}                              |  32 +-
 .../RawScanQuery.java}                             |  21 +-
 .../apache/kafka/streams/state/StateSerdes.java    |   9 +
 .../AbstractRocksDBSegmentedBytesStore.java        |  18 +
 .../state/internals/InMemoryKeyValueStore.java     |  12 +
 .../InMemoryTimeOrderedKeyValueBuffer.java         |  12 +
 .../streams/state/internals/MemoryLRUCache.java    |  12 +
 .../state/internals/MeteredKeyValueStore.java      |  41 ++
 .../state/internals/MeteredSessionStore.java       |   4 +
 .../state/internals/MeteredWindowStore.java        | 117 ++++--
 .../state/internals/QueryableStoreProvider.java    |   5 +
 .../streams/state/internals/RocksDBStore.java      |  42 ++
 .../streams/state/internals/StoreQueryUtils.java   | 122 ++++++
 .../internals/TimestampedKeyValueStoreBuilder.java |  15 +
 .../internals/TimestampedWindowStoreBuilder.java   |  15 +
 .../state/internals/ValueAndTimestampSerde.java    |  10 +
 .../WindowToTimestampedWindowByteStoreAdapter.java |  12 +
 .../streams/state/internals/WrappedStateStore.java |  18 +
 .../streams/integration/IQv2IntegrationTest.java   | 460 +++++++++++++++++++++
 .../streams/state/internals/RocksDBStoreTest.java  |   3 +
 32 files changed, 1867 insertions(+), 86 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bb0c40a..2356096 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
@@ -40,29 +41,38 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.StreamsNotStartedException;
 import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
 import org.apache.kafka.streams.errors.UnknownStateStoreException;
-import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
 import org.apache.kafka.streams.internals.metrics.ClientMetrics;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ClientUtils;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
-import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.InteractiveQueryRequest;
+import org.apache.kafka.streams.query.InteractiveQueryResult;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
+import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
+import org.apache.kafka.streams.state.internals.MeteredSessionStore;
+import org.apache.kafka.streams.state.internals.MeteredWindowStore;
 import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
 import org.slf4j.Logger;
@@ -78,17 +88,18 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -1716,4 +1727,112 @@ public class KafkaStreams implements AutoCloseable {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    public <K, V> StateSerdes<K, V> serdesForStore(final String storeName) {
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such store is registered in the topology."
+            );
+        }
+
+        // TODO this is a hack. We ought to be able to create the serdes independent of the
+        // TODO stores and cache them in the topology.
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            return getSerdes(store);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+                    final StateStore store = entry.getValue().getStore(storeName);
+                    if (store != null) {
+                        return getSerdes(store);
+                    }
+                }
+            }
+        }
+        // there may be no local copy of this store.
+        // This is the main reason I want to decouble serde
+        // creation from the store itself.
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <V, K> StateSerdes<K, V> getSerdes(final StateStore store) {
+        if (store instanceof MeteredKeyValueStore) {
+            return ((MeteredKeyValueStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredSessionStore) {
+            return ((MeteredSessionStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredWindowStore) {
+            return ((MeteredWindowStore<K, V>) store).serdes();
+        } else {
+            throw new IllegalArgumentException("Unknown store type: " + store);
+        }
+    }
+
+    @Evolving
+    public <R> InteractiveQueryResult<R> query(final InteractiveQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such store is registered in the topology."
+            );
+        }
+        final InteractiveQueryResult<R> result = new InteractiveQueryResult<>(new HashMap<>());
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.notActive(
+                                        state,
+                                        active,
+                                        partition
+                                    )
+                                );
+                            } else {
+                                final QueryResult<R> r = store.query(
+                                    request.getQuery(),
+                                    request.isRequireActive()
+                                        ? PositionBound.unbounded()
+                                        : request.getPositionBound(),
+                                    request.executionInfoEnabled()
+                                );
+                                result.addResult(partition, r);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return result;
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 76d1ab4..b1c480f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -19,6 +19,10 @@ package org.apache.kafka.streams.processor;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 
 /**
  * A storage engine for managing state maintained by a stream processor.
@@ -119,4 +123,25 @@ public interface StateStore {
      * @return {@code true} if the store is open
      */
     boolean isOpen();
+
+    /**
+     * Execute a query. Returns a QueryResult containing either result data or
+     * a failure.
+     * <p>
+     * If the store doesn't know how to handle the given query, the result
+     * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
+     * If the store couldn't satisfy the given position bound, the result
+     * will be a {@link FailureReason#NOT_UP_TO_BOUND}.
+     * @param query The query to execute
+     * @param positionBound The position the store must be at or past
+     * @param collectExecutionInfo Whether the store should collect detailed execution info for the query
+     * @param <R> The result type
+     */
+    default <R> QueryResult<R> query(
+        Query<R> query,
+        PositionBound positionBound,
+        boolean collectExecutionInfo) {
+
+        return QueryResult.forUnknownQueryType(query, this);
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
index 50d5879..f6f1446 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
@@ -46,8 +46,8 @@ public interface StateStoreContext {
 
     /**
      * Return the metadata of the current topic/partition/offset if available.
-     * This is defined as the metadata of the record that is currently been
-     * processed by the StreamTask that holds the store.
+     * This is defined as the metadata of the record that is currently being
+     * processed (or was last processed) by the StreamTask that holds the store.
      * <p>
      * Note that the metadata is not defined during all store interactions, for
      * example, while the StreamTask is running a punctuation.
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
new file mode 100644
index 0000000..02db7fe
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+public enum FailureReason {
+    /**
+     * Failure indicating that the store doesn't know how to handle the given query.
+     */
+    UNKNOWN_QUERY_TYPE,
+
+    /**
+     * Failure indicating that the store partition is not (yet) up to the desired bound.
+     * The caller should either try again later or try a different replica.
+     */
+    NOT_UP_TO_BOUND,
+
+    /**
+     * Failure indicating that the requested store partition is not present on the local
+     * KafkaStreams instance. It may have been migrated to another instance during a rebalance.
+     * The caller is recommended to try a different replica.
+     */
+    NOT_PRESENT,
+
+    /**
+     * The requested store partition does not exist at all. For example, partition 4 was requested,
+     * but the store in question only has 4 partitions (0 through 3).
+     */
+    DOES_NOT_EXIST;
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java
new file mode 100644
index 0000000..56fc0a8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * @param <R>
+ */
+public class InteractiveQueryRequest<R> {
+
+    private final String storeName;
+    private final PositionBound position;
+    private final Optional<Set<Integer>> partitions;
+    private final Query<R> query;
+    private boolean executionInfoEnabled;
+    private boolean requireActive;
+
+    private InteractiveQueryRequest(
+        final String storeName,
+        final PositionBound position,
+        final Optional<Set<Integer>> partitions,
+        final Query<R> query,
+        final boolean executionInfoEnabled, final boolean requireActive) {
+
+        this.storeName = storeName;
+        this.position = position;
+        this.partitions = partitions;
+        this.query = query;
+        this.executionInfoEnabled = executionInfoEnabled;
+        this.requireActive = requireActive;
+    }
+
+    public static InStore inStore(final String name) {
+        return new InStore(name);
+    }
+
+    public InteractiveQueryRequest<R> withPositionBound(final PositionBound positionBound) {
+        return new InteractiveQueryRequest<>(
+            storeName,
+            positionBound,
+            partitions,
+            query,
+            executionInfoEnabled,
+            false);
+    }
+
+
+    public InteractiveQueryRequest<R> withNoPartitions() {
+        return new InteractiveQueryRequest<>(storeName,
+            position,
+            Optional.of(Collections.emptySet()),
+            query,
+            executionInfoEnabled,
+            requireActive);
+    }
+
+    public InteractiveQueryRequest<R> withAllPartitions() {
+        return new InteractiveQueryRequest<>(storeName,
+            position,
+            Optional.empty(),
+            query,
+            executionInfoEnabled,
+            requireActive);
+    }
+
+    public InteractiveQueryRequest<R> withPartitions(final Set<Integer> partitions) {
+        return new InteractiveQueryRequest<>(storeName,
+            position,
+            Optional.of(Collections.unmodifiableSet(new HashSet<>(partitions))),
+            query,
+            executionInfoEnabled,
+            requireActive);
+    }
+
+    public String getStoreName() {
+        return storeName;
+    }
+
+    public PositionBound getPositionBound() {
+        if (requireActive) {
+            throw new IllegalArgumentException();
+        }
+        return Objects.requireNonNull(position);
+    }
+
+    public Query<R> getQuery() {
+        return query;
+    }
+
+    public boolean isAllPartitions() {
+        return !partitions.isPresent();
+    }
+
+    public Set<Integer> getPartitions() {
+        if (!partitions.isPresent()) {
+            throw new UnsupportedOperationException(
+                "Cannot list partitions of an 'all partitions' request");
+        } else {
+            return partitions.get();
+        }
+    }
+
+    public InteractiveQueryRequest<R> enableExecutionInfo() {
+        return new InteractiveQueryRequest<>(storeName,
+            position,
+            partitions,
+            query,
+            true,
+            requireActive);
+    }
+
+    public boolean executionInfoEnabled() {
+        return executionInfoEnabled;
+    }
+
+    public boolean isRequireActive() {
+        return requireActive;
+    }
+
+    public static class InStore {
+
+        private final String name;
+
+        private InStore(final String name) {
+            this.name = name;
+        }
+
+        public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query) {
+            return new InteractiveQueryRequest<>(
+                name,
+                null,
+                Optional.empty(),
+                query,
+                false,
+                true);
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java
new file mode 100644
index 0000000..4f8e728
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class InteractiveQueryResult<R> {
+
+    private final Map<Integer, QueryResult<R>> partitionResults;
+
+    public InteractiveQueryResult(final Map<Integer, QueryResult<R>> resultMap) {
+        partitionResults = resultMap;
+    }
+
+    public void setGlobalResult(final QueryResult<R> r) {
+
+    }
+
+    public void addResult(final int partition, final QueryResult<R> r) {
+        partitionResults.put(partition, r);
+    }
+
+    public Map<Integer, QueryResult<R>> getPartitionResults() {
+        return partitionResults;
+    }
+
+    public QueryResult<R> getOnlyPartitionResult() {
+        final List<QueryResult<R>> nonempty =
+            partitionResults
+                .values()
+                .stream()
+                .filter(r -> r.getResult() != null)
+                .collect(Collectors.toList());
+
+        if (nonempty.size() != 1) {
+            throw new IllegalStateException();
+        } else {
+            return nonempty.get(0);
+        }
+    }
+
+    public Position getPosition() {
+        Position position = Position.emptyPosition();
+        for (final QueryResult<R> r : partitionResults.values()) {
+            position = position.merge(r.getPosition());
+        }
+        return position;
+    }
+
+    @Override
+    public String toString() {
+        return "InteractiveQueryResult{" +
+            "partitionResults=" + partitionResults +
+            '}';
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Iterators.java b/streams/src/main/java/org/apache/kafka/streams/query/Iterators.java
new file mode 100644
index 0000000..d7ea7e9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Iterators.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.utils.CloseableIterator;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Objects;
+
+public final class Iterators {
+
+    private Iterators() {
+    }
+
+    public static <E, I extends Iterator<E>> CloseableIterator<E> collate(final Collection<I> iterators) {
+        return new CloseableIterator<E>() {
+            private final Deque<I> iteratorQueue = new LinkedList<>(iterators);
+
+            @Override
+            public void close() {
+                RuntimeException exception = null;
+                for (final I iterator : iterators) {
+                    if (iterator instanceof Closeable) {
+                        try {
+                            ((Closeable) iterator).close();
+                        } catch (final IOException e) {
+                            if (exception == null) {
+                                exception = new RuntimeException(
+                                    "Exception closing collated iterator", e);
+                            } else {
+                                exception.addSuppressed(e);
+                            }
+                        }
+                    }
+                }
+                if (exception != null) {
+                    throw exception;
+                }
+            }
+
+            @Override
+            public boolean hasNext() {
+                for (int i = 0; i < iterators.size(); i++) {
+                    final Iterator<E> iterator = Objects.requireNonNull(iteratorQueue.peek());
+                    if (iterator.hasNext()) {
+                        return true;
+                    } else {
+                        iteratorQueue.push(iteratorQueue.poll());
+                    }
+                }
+                return false;
+            }
+
+            @Override
+            public E next() {
+                final I iterator = iteratorQueue.poll();
+                final E next = Objects.requireNonNull(iterator).next();
+                iteratorQueue.push(iterator);
+                return next;
+            }
+        };
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
similarity index 56%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
copy to streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
index 1936d29..3900739 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
@@ -14,19 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.streams.query;
 
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+public class KeyQuery<K, V> implements Query<V> {
 
-import static java.util.Objects.requireNonNull;
+    private final K key;
 
-public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
-    public ValueAndTimestampSerde(final Serde<V> valueSerde) {
-        super(
-            new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
-            new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
-        );
+    private KeyQuery(final K key) {
+        this.key = key;
     }
-}
\ No newline at end of file
+
+    public static <K, V> KeyQuery<K, V> withKey(final K key) {
+        return new KeyQuery<>(key);
+    }
+
+    public K getKey() {
+        return key;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
new file mode 100644
index 0000000..5b0e981
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+
+public class Position {
+
+    private final Map<String, Map<Integer, Long>> position;
+
+    private Position(final Map<String, Map<Integer, Long>> position) {
+        this.position = position;
+    }
+
+    public static Position emptyPosition() {
+        return new Position(new HashMap<>());
+    }
+
+    public static Position fromMap(final Map<String, Map<Integer, Long>> map) {
+        return new Position(deepCopy(map));
+    }
+
+    public Position withComponent(final String topic, final int partition, final long offset) {
+        final Map<String, Map<Integer, Long>> updated = deepCopy(position);
+        updated.computeIfAbsent(topic, k -> new HashMap<>()).put(partition, offset);
+        return new Position(updated);
+    }
+
+    public Position merge(final Position other) {
+        if (other == null) {
+            return this;
+        } else {
+            final Map<String, Map<Integer, Long>> copy = deepCopy(position);
+            for (final Entry<String, Map<Integer, Long>> entry : other.position.entrySet()) {
+                final String topic = entry.getKey();
+                final Map<Integer, Long> partitionMap =
+                    copy.computeIfAbsent(topic, k -> new HashMap<>());
+                for (final Entry<Integer, Long> partitionOffset : entry.getValue().entrySet()) {
+                    final Integer partition = partitionOffset.getKey();
+                    final Long offset = partitionOffset.getValue();
+                    if (!partitionMap.containsKey(partition)
+                        || partitionMap.get(partition) < offset) {
+                        partitionMap.put(partition, offset);
+                    }
+                }
+            }
+            return new Position(copy);
+        }
+    }
+
+    public Set<String> getTopics() {
+        return Collections.unmodifiableSet(position.keySet());
+    }
+
+    public Map<Integer, Long> getBound(final String topic) {
+        return Collections.unmodifiableMap(position.get(topic));
+    }
+
+    public ByteBuffer serialize() {
+        final byte version = (byte) 0;
+
+        int arraySize = Byte.SIZE; // version
+
+        final int nTopics = position.size();
+        arraySize += Integer.SIZE;
+
+        final ArrayList<Entry<String, Map<Integer, Long>>> entries =
+            new ArrayList<>(position.entrySet());
+        final byte[][] topics = new byte[entries.size()][];
+
+        for (int i = 0; i < nTopics; i++) {
+            final Entry<String, Map<Integer, Long>> entry = entries.get(i);
+            final byte[] topicBytes = entry.getKey().getBytes(StandardCharsets.UTF_8);
+            topics[i] = topicBytes;
+            arraySize += Integer.SIZE; // topic name length
+            arraySize += topicBytes.length; // topic name itself
+
+            final Map<Integer, Long> partitionOffsets = entry.getValue();
+            arraySize += Integer.SIZE; // Number of PartitionOffset pairs
+            arraySize += (Integer.SIZE + Long.SIZE)
+                * partitionOffsets.size(); // partitionOffsets themselves
+        }
+
+        final ByteBuffer buffer = ByteBuffer.allocate(arraySize);
+        buffer.put(version);
+
+        buffer.putInt(nTopics);
+        for (int i = 0; i < nTopics; i++) {
+            buffer.putInt(topics[i].length);
+            buffer.put(topics[i]);
+
+            final Entry<String, Map<Integer, Long>> entry = entries.get(i);
+            final Map<Integer, Long> partitionOffsets = entry.getValue();
+            buffer.putInt(partitionOffsets.size());
+            for (final Entry<Integer, Long> partitionOffset : partitionOffsets.entrySet()) {
+                buffer.putInt(partitionOffset.getKey());
+                buffer.putLong(partitionOffset.getValue());
+            }
+        }
+
+        buffer.flip();
+        return buffer;
+    }
+
+    public static Position deserialize(final ByteBuffer buffer) {
+        final byte version = buffer.get();
+
+        switch (version) {
+            case (byte) 0:
+                final int nTopics = buffer.getInt();
+                final Map<String, Map<Integer, Long>> position = new HashMap<>(nTopics);
+                for (int i = 0; i < nTopics; i++) {
+                    final int topicNameLength = buffer.getInt();
+                    final byte[] topicNameBytes = new byte[topicNameLength];
+                    buffer.get(topicNameBytes);
+                    final String topic = new String(topicNameBytes, StandardCharsets.UTF_8);
+
+                    final int numPairs = buffer.getInt();
+                    final Map<Integer, Long> partitionOffsets = new HashMap<>(numPairs);
+                    for (int j = 0; j < numPairs; j++) {
+                        partitionOffsets.put(buffer.getInt(), buffer.getLong());
+                    }
+                    position.put(topic, partitionOffsets);
+                }
+                return Position.fromMap(position);
+            default:
+                throw new IllegalArgumentException(
+                    "Unknown version " + version + " when deserializing Position"
+                );
+        }
+    }
+
+    private static Map<String, Map<Integer, Long>> deepCopy(
+        final Map<String, Map<Integer, Long>> map) {
+        if (map == null) {
+            return new HashMap<>();
+        } else {
+            final Map<String, Map<Integer, Long>> copy = new HashMap<>(map.size());
+            for (final Entry<String, Map<Integer, Long>> entry : map.entrySet()) {
+                copy.put(entry.getKey(), new HashMap<>(entry.getValue()));
+            }
+            return copy;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "Position{" +
+            "position=" + position +
+            '}';
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final Position position1 = (Position) o;
+        return Objects.equals(position, position1.position);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(position);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
new file mode 100644
index 0000000..a4dbe35
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import java.util.Objects;
+
+public class PositionBound {
+
+    private final Position position;
+    private final boolean unbounded;
+
+    private PositionBound(final Position position, final boolean unbounded) {
+        if (unbounded && position != null) {
+            throw new IllegalArgumentException();
+        }
+        this.position = position;
+        this.unbounded = unbounded;
+    }
+
+    public static PositionBound unbounded() {
+        return new PositionBound(null, true);
+    }
+
+    public static PositionBound at(final Position position) {
+        return new PositionBound(position, false);
+    }
+
+    public boolean isUnbounded() {
+        return unbounded;
+    }
+
+    public Position position() {
+        if (unbounded) {
+            throw new IllegalArgumentException();
+        } else {
+            return position;
+        }
+    }
+
+    @Override
+    public String toString() {
+        if (isUnbounded()) {
+            return "PositionBound{unbounded}";
+        } else {
+            return "PositionBound{position=" + position + '}';
+        }
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final PositionBound that = (PositionBound) o;
+        return unbounded == that.unbounded && Objects.equals(position, that.position);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(position, unbounded);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/Query.java
similarity index 53%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
copy to streams/src/main/java/org/apache/kafka/streams/query/Query.java
index 1936d29..988f904 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Query.java
@@ -14,19 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.streams.query;
 
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
 
-import static java.util.Objects.requireNonNull;
+public interface Query<R> {
 
-public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
-    public ValueAndTimestampSerde(final Serde<V> valueSerde) {
-        super(
-            new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
-            new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
-        );
-    }
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
new file mode 100644
index 0000000..f3b92d6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StreamThread.State;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public final class QueryResult<R> {
+
+    private final List<String> executionInfo = new LinkedList<>();
+    private final FailureReason failureReason;
+    private final String failure;
+    private final R result;
+    private Position boundUpdate;
+
+    private QueryResult(final R result) {
+        this.result = result;
+        this.failureReason = null;
+        this.failure = null;
+    }
+
+    private QueryResult(final FailureReason failureReason, final String failure) {
+        this.result = null;
+        this.failureReason = failureReason;
+        this.failure = failure;
+    }
+
+    public static <R> QueryResult<R> forResult(final R result) {
+        return new QueryResult<>(result);
+    }
+
+    public static <R> QueryResult<R> forUnknownQueryType(
+        final Query<R> query,
+        final StateStore store) {
+
+        return new QueryResult<>(
+            FailureReason.UNKNOWN_QUERY_TYPE,
+            "This store (" + store.getClass() + ") doesn't know how to execute "
+                + "the given query (" + query + ")." +
+                " Contact the store maintainer if you need support for a new query type.");
+    }
+
+    public static <R> QueryResult<R> notUpToBound(
+        final Position currentPosition,
+        final PositionBound positionBound,
+        final int partition) {
+
+        return new QueryResult<>(
+            FailureReason.NOT_UP_TO_BOUND,
+            "For store partition " + partition + ", the current position "
+                + currentPosition + " is not yet up to the bound "
+                + positionBound
+        );
+    }
+
+    public static <R> QueryResult<R> notActive(
+        final State state,
+        final boolean active,
+        final int partition) {
+        return new QueryResult<>(
+            FailureReason.NOT_UP_TO_BOUND,
+            "Query requires a running active task,"
+                + " but partition " + partition + " was in state " + state + " and was "
+                + (active ? "active" : "not active") + "."
+        );
+    }
+
+
+    public <NewR> QueryResult<NewR> swapResult(final NewR typedResult) {
+        final QueryResult<NewR> queryResult = new QueryResult<>(typedResult);
+        queryResult.executionInfo.addAll(executionInfo);
+        queryResult.boundUpdate = boundUpdate;
+        return queryResult;
+    }
+
+    public void addExecutionInfo(final String s) {
+        executionInfo.add(s);
+    }
+
+    public void throwIfFailure() {
+        if (isFailure()) {
+            throw new RuntimeException(failureReason.name() + ": " + failure);
+        }
+    }
+
+    public boolean isSuccess() {
+        return failureReason == null;
+    }
+
+    public boolean isFailure() {
+        return failureReason != null;
+    }
+
+    public List<String> getExecutionInfo() {
+        return executionInfo;
+    }
+
+    public FailureReason getFailureReason() {
+        return failureReason;
+    }
+
+    public String getFailure() {
+        return failure;
+    }
+
+    public R getResult() {
+        if (result == null) {
+            throwIfFailure();
+        }
+        // will return `null` if there's not a failure recorded.
+        return result;
+    }
+
+    public void setPosition(final Position boundUpdate) {
+        this.boundUpdate = boundUpdate;
+    }
+
+    public Position getPosition() {
+        return boundUpdate;
+    }
+
+    @Override
+    public String toString() {
+        return "QueryResult{" +
+            "executionInfo=" + executionInfo +
+            ", failureReason=" + failureReason +
+            ", failure='" + failure + '\'' +
+            ", result=" + result +
+            ", boundUpdate=" + boundUpdate +
+            '}';
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java
similarity index 53%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
copy to streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java
index 1936d29..c42762e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java
@@ -14,19 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.streams.query;
 
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.common.utils.Bytes;
 
-import static java.util.Objects.requireNonNull;
+public class RawKeyQuery implements Query<byte[]> {
 
-public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
-    public ValueAndTimestampSerde(final Serde<V> valueSerde) {
-        super(
-            new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
-            new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
-        );
+    private final Bytes key;
+
+    private RawKeyQuery(final Bytes key) {
+        this.key = key;
+    }
+
+    public static RawKeyQuery withKey(final Bytes key) {
+        return new RawKeyQuery(key);
+    }
+
+    public static RawKeyQuery withKey(final byte[] key) {
+        return new RawKeyQuery(Bytes.wrap(key));
+    }
+
+    public Bytes getKey() {
+        return key;
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/RawScanQuery.java
similarity index 53%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
copy to streams/src/main/java/org/apache/kafka/streams/query/RawScanQuery.java
index 1936d29..d1f14f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/RawScanQuery.java
@@ -14,19 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.streams.query;
 
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
 
-import static java.util.Objects.requireNonNull;
+public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
 
-public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
-    public ValueAndTimestampSerde(final Serde<V> valueSerde) {
-        super(
-            new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
-            new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
-        );
+    private RawScanQuery() {}
+
+    public static RawScanQuery scan() {
+        return new RawScanQuery();
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index f9f0bdc..da7927e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -212,4 +212,13 @@ public final class StateSerdes<K, V> {
                     e);
         }
     }
+
+    @Override
+    public String toString() {
+        return "StateSerdes{" +
+            "topic='" + topic + '\'' +
+            ", keySerde=" + keySerde +
+            ", valueSerde=" + valueSerde +
+            '}';
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index bfee6b2..c1f7461 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -27,6 +27,10 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
@@ -279,6 +283,20 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
         return open;
     }
 
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+        if (query instanceof RawKeyQuery) {
+            final Bytes key = ((RawKeyQuery) query).getKey();
+            final byte[] bytes = get(key);
+            return QueryResult.forResult((R) bytes);
+        } else {
+            return QueryResult.forUnknownQueryType(query, this);
+        }
+    }
+
     // Visible for testing
     List<S> getSegments() {
         return segments.allSegments(false);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index f0c6dbe..f9bc34e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -21,6 +21,9 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.slf4j.Logger;
@@ -73,6 +76,15 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     }
 
     @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        return StoreQueryUtils.requireKVQuery(query, this, collectExecutionInfo);
+    }
+
+    @Override
     public synchronized byte[] get(final Bytes key) {
         return map.get(key);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index ba8a745..e13b357 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -40,6 +40,9 @@ import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordQueue;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.DeserializationResult;
@@ -240,6 +243,15 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
     }
 
     @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        return QueryResult.forUnknownQueryType(query, this);
+    }
+
+    @Override
     public void close() {
         open = false;
         index.clear();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 22f1215..1bea349 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -22,6 +22,9 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -108,6 +111,15 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
     }
 
     @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        return StoreQueryUtils.requireKVQuery(query, this, collectExecutionInfo);
+    }
+
+    @Override
     public synchronized byte[] get(final Bytes key) {
         Objects.requireNonNull(key);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 937288c..21b8e38 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -34,6 +34,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -186,6 +191,38 @@ public class MeteredKeyValueStore<K, V>
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        if (query instanceof KeyQuery) {
+            final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+            final RawKeyQuery rawKeyQuery = RawKeyQuery.withKey(keyBytes(typedQuery.getKey()));
+            final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+            if (rawResult.isSuccess()) {
+                final V value = outerValue(rawResult.getResult());
+                final QueryResult<V> typedQueryResult =
+                    rawResult.swapResult(value);
+                result = (QueryResult<R>) typedQueryResult;
+            } else {
+                // the generic type doesn't matter, since failed queries have no result set.
+                result = (QueryResult<R>) rawResult;
+            }
+        } else {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+        }
+        final long end = System.nanoTime();
+        result.addExecutionInfo(
+            "Handled in " + getClass() + " with serdes " + serdes + " in " + (end - start) + "ns");
+        return result;
+    }
+
     @Override
     public V get(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
@@ -324,6 +361,10 @@ public class MeteredKeyValueStore<K, V>
         }
     }
 
+    public StateSerdes<K, V> serdes() {
+        return serdes;
+    }
+
     private class MeteredKeyValueIterator implements KeyValueIterator<K, V> {
 
         private final KeyValueIterator<Bytes, byte[]> iter;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index b1eb948..87b3b40 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -372,4 +372,8 @@ public class MeteredSessionStore<K, V>
             e2eLatencySensor.record(e2eLatency, currentTime);
         }
     }
+
+    public StateSerdes<K, V> serdes() {
+        return serdes;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 0970703..1f267bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -33,6 +33,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
@@ -63,11 +68,11 @@ public class MeteredWindowStore<K, V>
     private TaskId taskId;
 
     MeteredWindowStore(final WindowStore<Bytes, byte[]> inner,
-                       final long windowSizeMs,
-                       final String metricsScope,
-                       final Time time,
-                       final Serde<K> keySerde,
-                       final Serde<V> valueSerde) {
+        final long windowSizeMs,
+        final String metricsScope,
+        final Time time,
+        final Serde<K> keySerde,
+        final Serde<V> valueSerde) {
         super(inner);
         this.windowSizeMs = windowSizeMs;
         this.metricsScope = metricsScope;
@@ -87,7 +92,8 @@ public class MeteredWindowStore<K, V>
 
         registerMetrics();
         final Sensor restoreSensor =
-            StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
+            StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(),
+                streamsMetrics);
 
         // register and possibly restore the state from the logs
         maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
@@ -103,20 +109,26 @@ public class MeteredWindowStore<K, V>
 
         registerMetrics();
         final Sensor restoreSensor =
-            StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
+            StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(),
+                streamsMetrics);
 
         // register and possibly restore the state from the logs
         maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
     }
+
     protected Serde<V> prepareValueSerde(final Serde<V> valueSerde, final SerdeGetter getter) {
         return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
     }
 
     private void registerMetrics() {
-        putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
-        fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
-        flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
-        e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
+        putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(),
+            streamsMetrics);
+        fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(),
+            streamsMetrics);
+        flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(),
+            streamsMetrics);
+        e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope,
+            name(), streamsMetrics);
     }
 
     @Deprecated
@@ -126,7 +138,8 @@ public class MeteredWindowStore<K, V>
         serdes = new StateSerdes<>(
             changelogTopic != null ?
                 changelogTopic :
-                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()),
+                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName,
+                    taskId.topologyName()),
             prepareKeySerde(keySerde, new SerdeGetter(context)),
             prepareValueSerde(valueSerde, new SerdeGetter(context)));
     }
@@ -137,7 +150,8 @@ public class MeteredWindowStore<K, V>
         serdes = new StateSerdes<>(
             changelogTopic != null ?
                 changelogTopic :
-                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()),
+                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName,
+                    taskId.topologyName()),
             prepareKeySerde(keySerde, new SerdeGetter(context)),
             prepareValueSerde(valueSerde, new SerdeGetter(context)));
     }
@@ -145,7 +159,7 @@ public class MeteredWindowStore<K, V>
     @SuppressWarnings("unchecked")
     @Override
     public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener,
-                                    final boolean sendOldValues) {
+        final boolean sendOldValues) {
         final WindowStore<Bytes, byte[]> wrapped = wrapped();
         if (wrapped instanceof CachedStateStore) {
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
@@ -161,10 +175,43 @@ public class MeteredWindowStore<K, V>
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo
+    ) {
+        if (query instanceof KeyQuery) {
+            final Windowed<K> key = ((KeyQuery<Windowed<K>, V>) query).getKey();
+            final Bytes bytes = keyBytes(key.key());
+            // NOTE: we need to _fully_ serialize the key, since we can't pass in any
+            // extra timestamp information in the RawKeyQuery. So, we go ahead and use the
+            // internal store's binary schema. This works for the provided KS windowed stores,
+            // but a custom store that uses a different schema will have to use the WindowKeySchema
+            // to read the data back out of the array and convert it to whatever the real binary
+            // key is.
+            // seqnum hard-coded to zero since we don't query stream-stream join stores.
+            final Bytes storeKey = WindowKeySchema.toStoreKeyBinary(
+                bytes,
+                key.window().start(),
+                0
+            );
+            final RawKeyQuery rawKeyQuery = RawKeyQuery.withKey(storeKey);
+            final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+            final V v = serdes.valueFrom(rawResult.getResult());
+            final QueryResult<V> result = rawResult.swapResult(v);
+            return (QueryResult<R>) result;
+        } else {
+            return super.query(query, positionBound, collectExecutionInfo);
+        }
+    }
+
     @Override
     public void put(final K key,
-                    final V value,
-                    final long windowStartTimestamp) {
+        final V value,
+        final long windowStartTimestamp) {
         Objects.requireNonNull(key, "key cannot be null");
         try {
             maybeMeasureLatency(
@@ -181,7 +228,7 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public V fetch(final K key,
-                   final long timestamp) {
+        final long timestamp) {
         Objects.requireNonNull(key, "key cannot be null");
         return maybeMeasureLatency(
             () -> {
@@ -198,8 +245,8 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public WindowStoreIterator<V> fetch(final K key,
-                                        final long timeFrom,
-                                        final long timeTo) {
+        final long timeFrom,
+        final long timeTo) {
         Objects.requireNonNull(key, "key cannot be null");
         return new MeteredWindowStoreIterator<>(
             wrapped().fetch(keyBytes(key), timeFrom, timeTo),
@@ -212,8 +259,8 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public WindowStoreIterator<V> backwardFetch(final K key,
-                                                final long timeFrom,
-                                                final long timeTo) {
+        final long timeFrom,
+        final long timeTo) {
         Objects.requireNonNull(key, "key cannot be null");
         return new MeteredWindowStoreIterator<>(
             wrapped().backwardFetch(keyBytes(key), timeFrom, timeTo),
@@ -226,9 +273,9 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
-                                                  final K keyTo,
-                                                  final long timeFrom,
-                                                  final long timeTo) {
+        final K keyTo,
+        final long timeFrom,
+        final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(
             wrapped().fetch(
                 keyBytes(keyFrom),
@@ -243,9 +290,9 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
-                                                          final K keyTo,
-                                                          final long timeFrom,
-                                                          final long timeTo) {
+        final K keyTo,
+        final long timeFrom,
+        final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(
             wrapped().backwardFetch(
                 keyBytes(keyFrom),
@@ -260,7 +307,7 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
-                                                     final long timeTo) {
+        final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(
             wrapped().fetchAll(timeFrom, timeTo),
             fetchSensor,
@@ -271,7 +318,7 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long timeFrom,
-                                                             final long timeTo) {
+        final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(
             wrapped().backwardFetchAll(timeFrom, timeTo),
             fetchSensor,
@@ -282,12 +329,14 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public KeyValueIterator<Windowed<K>, V> all() {
-        return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchSensor, streamsMetrics, serdes, time);
+        return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchSensor, streamsMetrics,
+            serdes, time);
     }
 
     @Override
     public KeyValueIterator<Windowed<K>, V> backwardAll() {
-        return new MeteredWindowedKeyValueIterator<>(wrapped().backwardAll(), fetchSensor, streamsMetrics, serdes, time);
+        return new MeteredWindowedKeyValueIterator<>(wrapped().backwardAll(), fetchSensor,
+            streamsMetrics, serdes, time);
     }
 
     @Override
@@ -313,8 +362,12 @@ public class MeteredWindowStore<K, V>
         // In that case, we _can't_ get the current timestamp, so we don't record anything.
         if (e2eLatencySensor.shouldRecord() && context != null) {
             final long currentTime = time.milliseconds();
-            final long e2eLatency =  currentTime - context.timestamp();
+            final long e2eLatency = currentTime - context.timestamp();
             e2eLatencySensor.record(e2eLatency, currentTime);
         }
     }
+
+    public StateSerdes<K, V> serdes() {
+        return serdes;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 07cf0ee..aedb67c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -20,6 +20,7 @@ import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -71,4 +72,8 @@ public class QueryableStoreProvider {
     public void removeStoreProviderForThread(final String threadName) {
         this.storeProviders.remove(threadName);
     }
+
+    public Collection<StreamThreadStateStoreProvider> getStoreProviders() {
+        return storeProviders.values();
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index aa1b1ba..e9ea595 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -28,6 +28,11 @@ import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -61,6 +66,7 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -105,6 +111,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
     private final RocksDBMetricsRecorder metricsRecorder;
 
     protected volatile boolean open = false;
+    private StateStoreContext context;
+    private Map<String, Map<Integer, Long>> seenOffsets = new HashMap<>();
 
     RocksDBStore(final String name,
                  final String metricsScope) {
@@ -252,6 +260,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
         context.register(root, new RocksDBBatchingRestoreCallback(this));
+        this.context = context;
     }
 
     @Override
@@ -269,6 +278,29 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
         return open;
     }
 
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        if (context == null) {
+            throw new IllegalStateException("Store is not yet initialized");
+        } else {
+            final int partition = this.context.taskId().partition();
+            if (StoreQueryUtils.isPermitted(seenOffsets, positionBound, partition)) {
+                final QueryResult<R> result = StoreQueryUtils.requireKVQuery(query, this,
+                    collectExecutionInfo);
+                final Position currentPosition = Position.fromMap(seenOffsets);
+                result.setPosition(currentPosition);
+                return result;
+            } else {
+                return QueryResult.notUpToBound(Position.fromMap(seenOffsets), positionBound,
+                    partition);
+            }
+        }
+    }
+
     private void validateStoreOpen() {
         if (!open) {
             throw new InvalidStateStoreException("Store " + name + " is currently closed");
@@ -281,6 +313,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
         Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
         dbAccessor.put(key.get(), value);
+        // FIXME record metadata can be null because when this store is used as a Segment,
+        // we never call init(). Is that correct?
+        // to make this logic work properly for segmented stores, we either need to
+        // track the seen offsets one level up (in the RocksDBSegmentedBytesStore) OR
+        // we need to get a reference to the context here.
+        if (context != null && context.recordMetadata().isPresent()) {
+            final RecordMetadata meta = context.recordMetadata().get();
+            seenOffsets.computeIfAbsent(meta.topic(), t -> new HashMap<>())
+                .put(meta.partition(), meta.offset());
+        }
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
new file mode 100644
index 0000000..603257c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.Map;
+
+public final class StoreQueryUtils {
+
+    // make this class uninstantiable
+    private StoreQueryUtils() {
+    }
+
+    public static <R> QueryResult<R> requireKVQuery(
+        final Query<R> query,
+        final KeyValueStore<Bytes, byte[]> kvStore,
+        final boolean enableExecutionInfo) {
+        final QueryResult<R> r = StoreQueryUtils.handleKVQuery(query, kvStore, enableExecutionInfo);
+        r.throwIfFailure();
+        return r;
+    }
+
+    public static <R> QueryResult<R> handleKVQuery(
+        final Query<R> query,
+        final KeyValueStore<Bytes, byte[]> kvStore,
+        final boolean enableExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final String name = query.getClass().getCanonicalName();
+        switch (name) {
+            case "org.apache.kafka.streams.query.RawKeyQuery": {
+                final RawKeyQuery keyQuery = (RawKeyQuery) query;
+                return handleRawKeyQuery(kvStore, enableExecutionInfo, start, keyQuery);
+            }
+            case "org.apache.kafka.streams.query.RawScanQuery": {
+                final KeyValueIterator<Bytes, byte[]> iterator = kvStore.all();
+                @SuppressWarnings("unchecked") final R result = (R) iterator;
+                final long end = System.nanoTime();
+                final QueryResult<R> queryResult = QueryResult.forResult(result);
+                if (enableExecutionInfo) {
+                    queryResult.addExecutionInfo("Handled on " + kvStore.getClass().getName()
+                        + "#all via StoreQueryAdapters" + " in " + (end - start) + "ns");
+                }
+                return queryResult;
+            }
+            default:
+                return QueryResult.forUnknownQueryType(query, kvStore);
+        }
+    }
+
+    public static <R> QueryResult<R> handleRawKeyQuery(
+        final KeyValueStore<Bytes, byte[]> kvStore,
+        final boolean enableExecutionInfo,
+        final long start,
+        final RawKeyQuery keyQuery) {
+
+        final Bytes key = keyQuery.getKey();
+        final byte[] value = kvStore.get(key);
+        @SuppressWarnings("unchecked") final R result = (R) value;
+        final long end = System.nanoTime();
+
+        final QueryResult<R> queryResult = QueryResult.forResult(result);
+        if (enableExecutionInfo) {
+            queryResult.addExecutionInfo("Handled on " + kvStore.getClass().getName()
+                + "#get via StoreQueryAdapters" + " in " + (end - start) + "ns");
+        }
+        return queryResult;
+    }
+
+    public static boolean isPermitted(
+        final Map<String, Map<Integer, Long>> seenOffsets,
+        final PositionBound positionBound,
+        final int partition) {
+        if (positionBound.isUnbounded()) {
+            return true;
+        } else {
+            final Position position = positionBound.position();
+            for (final String topic : position.getTopics()) {
+                final Map<Integer, Long> partitionBounds = position.getBound(topic);
+                final Map<Integer, Long> seenPartitionBounds = seenOffsets.get(topic);
+                if (!partitionBounds.containsKey(partition)) {
+                    // this topic isn't bounded for our partition, so just skip over it.
+                } else {
+                    if (seenPartitionBounds == null) {
+                        // we haven't seen a topic that is bounded for our partition
+                        return false;
+                    } else if (!seenPartitionBounds.containsKey(partition)) {
+                        // we haven't seen a partition that we have a bound for
+                        return false;
+                    } else if (seenPartitionBounds.get(partition) < partitionBounds.get(
+                        partition)) {
+                        // our current position is behind the bound
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index a249a14..f4eb55e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -24,6 +24,9 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -185,6 +188,18 @@ public class TimestampedKeyValueStoreBuilder<K, V>
         }
 
         @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo) {
+
+            final long start = System.nanoTime();
+            final QueryResult<R> result = wrapped.query(query, positionBound, collectExecutionInfo);
+            final long end = System.nanoTime();
+            result.addExecutionInfo("Handled in " + getClass() + " in " + (end - start) + "ns");
+            return result;
+        }
+
+        @Override
         public String name() {
             return wrapped.name();
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index b3727f5..9b28ef5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -23,6 +23,9 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -203,6 +206,18 @@ public class TimestampedWindowStoreBuilder<K, V>
         }
 
         @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo) {
+
+            final long start = System.nanoTime();
+            final QueryResult<R> result = wrapped.query(query, positionBound, collectExecutionInfo);
+            final long end = System.nanoTime();
+            result.addExecutionInfo("Handled in " + getClass() + " in " + (end - start) + "ns");
+            return result;
+        }
+
+        @Override
         public String name() {
             return wrapped.name();
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
index 1936d29..599b6e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
@@ -23,10 +23,20 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 import static java.util.Objects.requireNonNull;
 
 public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+    private final Serde<V> valueSerde;
+
     public ValueAndTimestampSerde(final Serde<V> valueSerde) {
         super(
             new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
             new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
         );
+        this.valueSerde = valueSerde;
+    }
+
+    @Override
+    public String toString() {
+        return "ValueAndTimestampSerde{" +
+            "valueSerde=" + valueSerde +
+            '}';
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index f7999d3..2b626cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -21,6 +21,9 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -183,6 +186,15 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
         return store.isOpen();
     }
 
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        return QueryResult.forUnknownQueryType(query, this);
+    }
+
 
     private static class WindowToTimestampedWindowIteratorAdapter
         extends KeyValueToTimestampedKeyValueIteratorAdapter<Long>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index e8244f7..e904a48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -20,6 +20,9 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 
 /**
@@ -103,6 +106,21 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S
         wrapped.close();
     }
 
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+        final long start = System.nanoTime();
+        final QueryResult<R> result = wrapped().query(query, positionBound, collectExecutionInfo);
+        if (collectExecutionInfo) {
+            final long end = System.nanoTime();
+            result.addExecutionInfo(
+                "Handled in " + getClass() + " via WrappedStateStore" + " in " + (end - start)
+                    + "ns");
+        }
+        return result;
+    }
+
     public S wrapped() {
         return wrapped;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
new file mode 100644
index 0000000..23374f3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.InteractiveQueryRequest;
+import org.apache.kafka.streams.query.InteractiveQueryResult;
+import org.apache.kafka.streams.query.Iterators;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
+import org.apache.kafka.streams.query.RawScanQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.query.InteractiveQueryRequest.inStore;
+import static org.apache.kafka.streams.query.PositionBound.at;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThrows;
+
+@Category({IntegrationTest.class})
+public class IQv2IntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final String INPUT2_TOPIC_NAME = "input2-topic";
+    private static final String UNCACHED_TABLE = "uncached-table";
+    private static final String UNCACHED_COUNTS_TABLE = "uncached-counts-table";
+    private static final String CACHED_TABLE = "cached-table";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+    private static KafkaStreams kafkaStreams;
+
+    @BeforeClass
+    public static void before() throws InterruptedException, IOException {
+        CLUSTER.start();
+        CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, 1);
+        CLUSTER.createTopic(INPUT2_TOPIC_NAME, 2, 1);
+
+        final Semaphore semaphore = new Semaphore(0);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder
+            .table(
+                INPUT_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(UNCACHED_TABLE)
+                    .withCachingDisabled()
+            )
+            .filter(
+                (a, b) -> true,
+                Materialized.as(CACHED_TABLE)
+            )
+            .toStream()
+            .peek((k, v) -> semaphore.release());
+
+        builder
+            .stream(
+                INPUT2_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer())
+            )
+            .groupByKey()
+            .windowedBy(TimeWindows.ofSizeAndGrace(
+                Duration.ofMillis(100),
+                Duration.ZERO
+            ))
+            .count(
+                Materialized
+                    .<Integer, Long, WindowStore<Bytes, byte[]>>as(UNCACHED_COUNTS_TABLE)
+                    .withCachingDisabled()
+            )
+            .toStream()
+            .peek((k, v) -> semaphore.release());
+
+        kafkaStreams =
+            IntegrationTestUtils.getRunningStreams(streamsConfiguration(), builder, true);
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            INPUT_TOPIC_NAME,
+            Arrays.asList(new KeyValue<>(1, 1), new KeyValue<>(2, 2), new KeyValue<>(3, 3)),
+            producerProps,
+            Time.SYSTEM
+        );
+        // Assert that all messages in the first batch were processed in a timely manner
+        assertThat(semaphore.tryAcquire(3, 60, TimeUnit.SECONDS), is(equalTo(true)));
+
+        IntegrationTestUtils.produceSynchronously(
+            producerProps,
+            false,
+            INPUT2_TOPIC_NAME,
+            Optional.empty(),
+            Arrays.asList(
+                new KeyValueTimestamp<>(1, 1, 0),
+                new KeyValueTimestamp<>(1, 1, 10)
+            )
+        );
+
+        // Assert that we processed the second batch (should see both updates, since caching is disabled)
+        assertThat(semaphore.tryAcquire(2, 60, TimeUnit.SECONDS), is(equalTo(true)));
+
+
+    }
+
+    @AfterClass
+    public static void after() {
+        kafkaStreams.close(Duration.of(1, ChronoUnit.MINUTES));
+        kafkaStreams.cleanUp();
+        CLUSTER.stop();
+    }
+
+    @Test
+    public void shouldQueryKeyFromCachedTable() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(CACHED_TABLE);
+
+        final byte[] rawKey = serdes.rawKey(1);
+        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+            inStore(CACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey)));
+
+        System.out.println("|||" + result);
+        final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0);
+        final ValueAndTimestamp<Integer> value =
+            serdes.valueFrom(rawValueResult.getResult());
+        System.out.println("|||" + value);
+
+        assertThat(value.value(), is(1));
+        assertThat(result.getPosition(),
+            is(Position.fromMap(
+                mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+    }
+
+    @Test
+    public void shouldQueryKeyFromUncachedTable() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+        final byte[] rawKey = serdes.rawKey(1);
+        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+            inStore(UNCACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey)));
+
+        System.out.println("|||" + result);
+        final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0);
+        final ValueAndTimestamp<Integer> value =
+            serdes.valueFrom(rawValueResult.getResult());
+        System.out.println("|||" + value);
+
+        assertThat(value.value(), is(1));
+        assertThat(result.getPosition(),
+            is(Position.fromMap(
+                mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+    }
+
+    @Test
+    public void shouldQueryTypedKeyFromUncachedTable() {
+        final Integer key = 1;
+
+        final InteractiveQueryRequest<ValueAndTimestamp<Integer>> query =
+            inStore(UNCACHED_TABLE).withQuery(KeyQuery.withKey(key));
+
+        final InteractiveQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query);
+
+        final ValueAndTimestamp<Integer> value = result.getOnlyPartitionResult().getResult();
+
+        assertThat(value.value(), is(1));
+        assertThat(result.getPosition(),
+            is(Position.fromMap(
+                mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+    }
+
+    @Test
+    public void exampleKeyQueryIntoWindowStore() {
+        final Windowed<Integer> key = new Windowed<>(1, new TimeWindow(0L, 99L));
+
+        final InteractiveQueryRequest<ValueAndTimestamp<Long>> query =
+            inStore(UNCACHED_COUNTS_TABLE).withQuery(KeyQuery.withKey(key));
+
+        final InteractiveQueryResult<ValueAndTimestamp<Long>> result = kafkaStreams.query(query);
+
+        final ValueAndTimestamp<Long> value = result.getOnlyPartitionResult().getResult();
+
+        assertThat(value.value(), is(2L));
+    }
+
+    @Test
+    public void shouldScanUncachedTablePartitions() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+        final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
+            kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan()));
+
+        System.out.println("|||" + scanResult);
+        final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
+            scanResult.getPartitionResults();
+        for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) {
+            try (final KeyValueIterator<Bytes, byte[]> keyValueIterator =
+                entry.getValue().getResult()) {
+                while (keyValueIterator.hasNext()) {
+                    final KeyValue<Bytes, byte[]> next = keyValueIterator.next();
+                    System.out.println(
+                        "|||" + entry.getKey() +
+                            " " + serdes.keyFrom(next.key.get()) +
+                            " " + serdes.valueFrom(next.value)
+                    );
+                }
+            }
+        }
+
+        assertThat(scanResult.getPosition(),
+            is(Position.fromMap(
+                mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+    }
+
+    @Test
+    public void shouldScanUncachedTableCollated() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+        final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
+            kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan()));
+
+        System.out.println("|||" + scanResult);
+        final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults = scanResult.getPartitionResults();
+
+        final List<KeyValueIterator<Bytes, byte[]>> collect =
+            partitionResults
+                .values()
+                .stream()
+                .map(QueryResult::getResult)
+                .collect(Collectors.toList());
+        try (final CloseableIterator<KeyValue<Bytes, byte[]>> collate = Iterators.collate(
+            collect)) {
+            while (collate.hasNext()) {
+                final KeyValue<Bytes, byte[]> next = collate.next();
+                System.out.println(
+                    "|||" +
+                        " " + serdes.keyFrom(next.key.get()) +
+                        " " + serdes.valueFrom(next.value)
+                );
+            }
+        }
+
+        assertThat(scanResult.getPosition(),
+            is(Position.fromMap(
+                mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+    }
+
+    @Test
+    public void shouldQueryWithinBound() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+        final byte[] rawKey = serdes.rawKey(1);
+        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+            inStore(UNCACHED_TABLE)
+                .withQuery(RawKeyQuery.withKey(rawKey))
+                .withPositionBound(
+                    at(
+                        Position
+                            .emptyPosition()
+                            .withComponent(INPUT_TOPIC_NAME, 0, 0L)
+                            .withComponent(INPUT_TOPIC_NAME, 1, 1L)
+                    )
+                )
+        );
+
+        System.out.println("|||" + result);
+        final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0);
+        final ValueAndTimestamp<Integer> value =
+            serdes.valueFrom(rawValueResult.getResult());
+        System.out.println("|||" + value);
+        assertThat(result.getPosition(),
+            is(Position.fromMap(
+                mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+    }
+
+    @Test
+    public void shouldFailQueryOutsideOfBound() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+        final byte[] rawKey = serdes.rawKey(1);
+        // intentionally setting the bound higher than the current position.
+        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+            inStore(UNCACHED_TABLE)
+                .withQuery(RawKeyQuery.withKey(rawKey))
+                .withPositionBound(
+                    at(
+                        Position
+                            .emptyPosition()
+                            .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                            .withComponent(INPUT_TOPIC_NAME, 1, 2L)
+                    )
+                )
+        );
+
+        System.out.println("|||" + result);
+        final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0);
+
+        final RuntimeException runtimeException = assertThrows(
+            RuntimeException.class,
+            rawValueResult::getResult
+        );
+        assertThat(
+            runtimeException.getMessage(),
+            is("NOT_UP_TO_BOUND: For store partition 0, the current position Position{position={input-topic={0=0}}} is not yet up to the bound PositionBound{position=Position{position={input-topic={0=1, 1=2}}}}"));
+
+        assertThat(rawValueResult.isFailure(), is(true));
+        assertThat(rawValueResult.getFailureReason(), is(FailureReason.NOT_UP_TO_BOUND));
+        assertThat(rawValueResult.getFailure(),
+            is("For store partition 0, the current position Position{position={input-topic={0=0}}} is not yet up to the bound PositionBound{position=Position{position={input-topic={0=1, 1=2}}}}"));
+        assertThat(result.getPosition(), is(Position.emptyPosition()));
+    }
+
+
+    @Test
+    public void shouldPartiallySucceedOnPartialBound() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+        final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
+            kafkaStreams.query(
+                inStore(UNCACHED_TABLE)
+                    .withQuery(RawScanQuery.scan())
+                    .withPositionBound(
+                        at(
+                            Position
+                                .emptyPosition()
+                                .withComponent(INPUT_TOPIC_NAME, 0, 0L)
+                                .withComponent(INPUT_TOPIC_NAME, 1, 2L)
+                        )
+                    )
+            );
+
+        System.out.println("|||" + scanResult);
+        final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults = scanResult.getPartitionResults();
+        for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) {
+            final QueryResult<KeyValueIterator<Bytes, byte[]>> value = entry.getValue();
+            if (value.isSuccess()) {
+                try (final KeyValueIterator<Bytes, byte[]> keyValueIterator =
+                    value.getResult()) {
+                    while (keyValueIterator.hasNext()) {
+                        final KeyValue<Bytes, byte[]> next = keyValueIterator.next();
+                        System.out.println(
+                            "|||" + entry.getKey() +
+                                " " + serdes.keyFrom(next.key.get()) +
+                                " " + serdes.valueFrom(next.value)
+                        );
+                    }
+                }
+            }
+        }
+
+        assertThat(scanResult.getPartitionResults().get(0).isSuccess(), is(true));
+        assertThat(scanResult.getPartitionResults().get(1).isFailure(), is(true));
+        assertThat(scanResult.getPartitionResults().get(1).getFailureReason(),
+            is(FailureReason.NOT_UP_TO_BOUND));
+        assertThat(scanResult.getPosition(),
+            is(Position.fromMap(mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L)))))));
+    }
+
+    private static Properties streamsConfiguration() {
+        final String safeTestName = IQv2IntegrationTest.class.getName();
+        final Properties config = new Properties();
+        config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+        config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
+        config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
+        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        return config;
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 066f080..3772904 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -71,6 +71,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
@@ -786,6 +787,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
         EasyMock.expect(context.appConfigs())
             .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
         EasyMock.expect(context.stateDir()).andStubReturn(dir);
+        EasyMock.expect(context.recordMetadata()).andReturn(Optional.empty());
         EasyMock.replay(context);
 
         rocksDBStore.init((StateStoreContext) context, rocksDBStore);
@@ -818,6 +820,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
         EasyMock.expect(context.appConfigs())
                 .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
         EasyMock.expect(context.stateDir()).andStubReturn(dir);
+        EasyMock.expect(context.recordMetadata()).andReturn(Optional.empty());
         EasyMock.replay(context);
 
         rocksDBStore.init((StateStoreContext) context, rocksDBStore);