You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/11/24 05:10:38 UTC

[kafka] branch iqv2-framework created (now 2188078)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a change to branch iqv2-framework
in repository https://gitbox.apache.org/repos/asf/kafka.git.


      at 2188078  finalizing...

This branch includes the following new commits:

     new 3e30837  POC of the KIP-796 IQv2 proposal
     new da055cc  formalize Position
     new 2188078  finalizing...

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[kafka] 03/03: finalizing...

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-framework
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2188078203c429ac3ff44198f3da4d57d4153ea2
Author: John Roesler <vv...@apache.org>
AuthorDate: Tue Nov 23 22:49:27 2021 -0600

    finalizing...
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   8 +-
 .../apache/kafka/streams/processor/StateStore.java |  14 +-
 .../streams/query/InteractiveQueryRequest.java     | 157 --------------
 .../streams/query/InteractiveQueryResult.java      |  72 -------
 .../apache/kafka/streams/query/PositionBound.java  |  36 +++-
 .../kafka/streams/query/StateQueryRequest.java     | 226 +++++++++++++++++++++
 .../kafka/streams/query/StateQueryResult.java      | 118 +++++++++++
 .../streams/integration/IQv2IntegrationTest.java   |  31 ++-
 .../kafka/streams/query/PositionBoundTest.java     | 100 +++++++++
 9 files changed, 505 insertions(+), 257 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2356096..4b926cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -63,8 +63,8 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat
 import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.query.InteractiveQueryRequest;
-import org.apache.kafka.streams.query.InteractiveQueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.HostInfo;
@@ -1772,14 +1772,14 @@ public class KafkaStreams implements AutoCloseable {
     }
 
     @Evolving
-    public <R> InteractiveQueryResult<R> query(final InteractiveQueryRequest<R> request) {
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
         final String storeName = request.getStoreName();
         if (!topologyMetadata.hasStore(storeName)) {
             throw new UnknownStateStoreException(
                 "Cannot get state store " + storeName + " because no such store is registered in the topology."
             );
         }
-        final InteractiveQueryResult<R> result = new InteractiveQueryResult<>(new HashMap<>());
+        final StateQueryResult<R> result = new StateQueryResult<>();
 
         final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
         if (globalStateStores.containsKey(storeName)) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index b1c480f..2f96020 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
@@ -129,19 +130,26 @@ public interface StateStore {
      * a failure.
      * <p>
      * If the store doesn't know how to handle the given query, the result
-     * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
+     * shall be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
      * If the store couldn't satisfy the given position bound, the result
-     * will be a {@link FailureReason#NOT_UP_TO_BOUND}.
+     * shall be a {@link FailureReason#NOT_UP_TO_BOUND}.
+     * <p>
+     * Note to store implementers: if your store does not support position tracking,
+     * you can correctly respond {@link FailureReason#NOT_UP_TO_BOUND} if the argument is
+     * anything but {@link PositionBound#unbounded()}. Be sure to explain in the failure message
+     * that bounded positions are not supported.
+     * <p>
      * @param query The query to execute
      * @param positionBound The position the store must be at or past
      * @param collectExecutionInfo Whether the store should collect detailed execution info for the query
      * @param <R> The result type
      */
+    @Evolving
     default <R> QueryResult<R> query(
         Query<R> query,
         PositionBound positionBound,
         boolean collectExecutionInfo) {
-
+        // If a store doesn't implement a query handler, then all queries are unknown.
         return QueryResult.forUnknownQueryType(query, this);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java
deleted file mode 100644
index 56fc0a8..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.query;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * @param <R>
- */
-public class InteractiveQueryRequest<R> {
-
-    private final String storeName;
-    private final PositionBound position;
-    private final Optional<Set<Integer>> partitions;
-    private final Query<R> query;
-    private boolean executionInfoEnabled;
-    private boolean requireActive;
-
-    private InteractiveQueryRequest(
-        final String storeName,
-        final PositionBound position,
-        final Optional<Set<Integer>> partitions,
-        final Query<R> query,
-        final boolean executionInfoEnabled, final boolean requireActive) {
-
-        this.storeName = storeName;
-        this.position = position;
-        this.partitions = partitions;
-        this.query = query;
-        this.executionInfoEnabled = executionInfoEnabled;
-        this.requireActive = requireActive;
-    }
-
-    public static InStore inStore(final String name) {
-        return new InStore(name);
-    }
-
-    public InteractiveQueryRequest<R> withPositionBound(final PositionBound positionBound) {
-        return new InteractiveQueryRequest<>(
-            storeName,
-            positionBound,
-            partitions,
-            query,
-            executionInfoEnabled,
-            false);
-    }
-
-
-    public InteractiveQueryRequest<R> withNoPartitions() {
-        return new InteractiveQueryRequest<>(storeName,
-            position,
-            Optional.of(Collections.emptySet()),
-            query,
-            executionInfoEnabled,
-            requireActive);
-    }
-
-    public InteractiveQueryRequest<R> withAllPartitions() {
-        return new InteractiveQueryRequest<>(storeName,
-            position,
-            Optional.empty(),
-            query,
-            executionInfoEnabled,
-            requireActive);
-    }
-
-    public InteractiveQueryRequest<R> withPartitions(final Set<Integer> partitions) {
-        return new InteractiveQueryRequest<>(storeName,
-            position,
-            Optional.of(Collections.unmodifiableSet(new HashSet<>(partitions))),
-            query,
-            executionInfoEnabled,
-            requireActive);
-    }
-
-    public String getStoreName() {
-        return storeName;
-    }
-
-    public PositionBound getPositionBound() {
-        if (requireActive) {
-            throw new IllegalArgumentException();
-        }
-        return Objects.requireNonNull(position);
-    }
-
-    public Query<R> getQuery() {
-        return query;
-    }
-
-    public boolean isAllPartitions() {
-        return !partitions.isPresent();
-    }
-
-    public Set<Integer> getPartitions() {
-        if (!partitions.isPresent()) {
-            throw new UnsupportedOperationException(
-                "Cannot list partitions of an 'all partitions' request");
-        } else {
-            return partitions.get();
-        }
-    }
-
-    public InteractiveQueryRequest<R> enableExecutionInfo() {
-        return new InteractiveQueryRequest<>(storeName,
-            position,
-            partitions,
-            query,
-            true,
-            requireActive);
-    }
-
-    public boolean executionInfoEnabled() {
-        return executionInfoEnabled;
-    }
-
-    public boolean isRequireActive() {
-        return requireActive;
-    }
-
-    public static class InStore {
-
-        private final String name;
-
-        private InStore(final String name) {
-            this.name = name;
-        }
-
-        public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query) {
-            return new InteractiveQueryRequest<>(
-                name,
-                null,
-                Optional.empty(),
-                query,
-                false,
-                true);
-        }
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java
deleted file mode 100644
index 4f8e728..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.query;
-
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class InteractiveQueryResult<R> {
-
-    private final Map<Integer, QueryResult<R>> partitionResults;
-
-    public InteractiveQueryResult(final Map<Integer, QueryResult<R>> resultMap) {
-        partitionResults = resultMap;
-    }
-
-    public void setGlobalResult(final QueryResult<R> r) {
-
-    }
-
-    public void addResult(final int partition, final QueryResult<R> r) {
-        partitionResults.put(partition, r);
-    }
-
-    public Map<Integer, QueryResult<R>> getPartitionResults() {
-        return partitionResults;
-    }
-
-    public QueryResult<R> getOnlyPartitionResult() {
-        final List<QueryResult<R>> nonempty =
-            partitionResults
-                .values()
-                .stream()
-                .filter(r -> r.getResult() != null)
-                .collect(Collectors.toList());
-
-        if (nonempty.size() != 1) {
-            throw new IllegalStateException();
-        } else {
-            return nonempty.get(0);
-        }
-    }
-
-    public Position getPosition() {
-        Position position = Position.emptyPosition();
-        for (final QueryResult<R> r : partitionResults.values()) {
-            position = position.merge(r.getPosition());
-        }
-        return position;
-    }
-
-    @Override
-    public String toString() {
-        return "InteractiveQueryResult{" +
-            "partitionResults=" + partitionResults +
-            '}';
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
index a4dbe35..185a88e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
@@ -17,8 +17,17 @@
 package org.apache.kafka.streams.query;
 
 
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
 import java.util.Objects;
 
+/**
+ * A class bounding the processing state Position during queries.
+ * This can be used to specify that a query should fail if the
+ * locally available partition isn't caught up to the specified bound.
+ * "Unbounded" places no restrictions on the current location of the partition.
+ */
+@Evolving
 public class PositionBound {
 
     private final Position position;
@@ -27,26 +36,45 @@ public class PositionBound {
     private PositionBound(final Position position, final boolean unbounded) {
         if (unbounded && position != null) {
             throw new IllegalArgumentException();
+        } else if (position != null) {
+            this.position = position.copy();
+            this.unbounded = false;
+        } else {
+            this.position = null;
+            this.unbounded = unbounded;
         }
-        this.position = position;
-        this.unbounded = unbounded;
     }
 
+    /**
+     * Creates a new PositionBound representing "no bound"
+     */
     public static PositionBound unbounded() {
         return new PositionBound(null, true);
     }
 
+    /**
+     * Creates a new PositionBound representing a specific position.
+     */
     public static PositionBound at(final Position position) {
         return new PositionBound(position, false);
     }
 
+    /**
+     * Returns true iff this object specifies that there is no position bound.
+     */
     public boolean isUnbounded() {
         return unbounded;
     }
 
+    /**
+     * Returns the specific position of this bound.
+     * @throws IllegalArgumentException if this is an "unbounded" position.
+     */
     public Position position() {
         if (unbounded) {
-            throw new IllegalArgumentException();
+            throw new IllegalArgumentException(
+                "Cannot get the position of an unbounded PositionBound."
+            );
         } else {
             return position;
         }
@@ -75,6 +103,6 @@ public class PositionBound {
 
     @Override
     public int hashCode() {
-        return Objects.hash(position, unbounded);
+        throw new UnsupportedOperationException("This mutable object is not suitable as a hash key");
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java
new file mode 100644
index 0000000..1f3dbf0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * The request object for Interactive Queries. This is an immutable builder class for passing all
+ * required and optional arguments for querying a state store in Kafka Streams.
+ * <p>
+ *
+ * @param <R> The type of the query result.
+ */
+@Evolving
+public class StateQueryRequest<R> {
+
+    private final String storeName;
+    private final PositionBound position;
+    private final Optional<Set<Integer>> partitions;
+    private final Query<R> query;
+    private final boolean executionInfoEnabled;
+    private final boolean requireActive;
+
+    private StateQueryRequest(
+        final String storeName,
+        final PositionBound position,
+        final Optional<Set<Integer>> partitions,
+        final Query<R> query,
+        final boolean executionInfoEnabled,
+        final boolean requireActive) {
+
+        this.storeName = storeName;
+        this.position = position;
+        this.partitions = partitions;
+        this.query = query;
+        this.executionInfoEnabled = executionInfoEnabled;
+        this.requireActive = requireActive;
+    }
+
+    /**
+     * Specifies the name of the store to query.
+     */
+    public static InStore inStore(final String name) {
+        return new InStore(name);
+    }
+
+    /**
+     * Bounds the position of the state store against its input topics.
+     */
+    public StateQueryRequest<R> withPositionBound(final PositionBound positionBound) {
+        return new StateQueryRequest<>(
+            storeName,
+            positionBound,
+            partitions,
+            query,
+            executionInfoEnabled,
+            requireActive
+        );
+    }
+
+
+    /**
+     * Specifies that the query will run against all locally available partitions.
+     */
+    public StateQueryRequest<R> withAllPartitions() {
+        return new StateQueryRequest<>(
+            storeName,
+            position,
+            Optional.empty(),
+            query,
+            executionInfoEnabled,
+            requireActive
+        );
+    }
+
+    /**
+     * Specifies a set of partitions to run against. If some partitions are not locally available,
+     * the response will contain a {@link FailureReason#NOT_PRESENT} for those partitions. If
+     * some partitions in this set are not valid partitions for the store, the response will
+     * contain a {@link FailureReason#DOES_NOT_EXIST} for those partitions.
+     */
+    public StateQueryRequest<R> withPartitions(final Set<Integer> partitions) {
+        return new StateQueryRequest<>(
+            storeName,
+            position,
+            Optional.of(Collections.unmodifiableSet(new HashSet<>(partitions))),
+            query,
+            executionInfoEnabled,
+            requireActive
+        );
+    }
+
+    /**
+     * Requests for stores and the Streams runtime to record any useful details about
+     * how the query was executed.
+     */
+    public StateQueryRequest<R> enableExecutionInfo() {
+        return new StateQueryRequest<>(
+            storeName,
+            position,
+            partitions,
+            query,
+            true,
+            requireActive
+        );
+    }
+
+    /**
+     * Specifies that this query should only run on partitions for which this instance is
+     * the leader (aka "active"). Partitions for which this instance is not the active replica
+     * will return {@link FailureReason#NOT_UP_TO_BOUND}.
+     */
+    public StateQueryRequest<R> requireActive() {
+        return new StateQueryRequest<>(
+            storeName,
+            position,
+            partitions,
+            query,
+            executionInfoEnabled,
+            true
+        );
+    }
+
+    /**
+     * The name of the store this request is for.
+     */
+    public String getStoreName() {
+        return storeName;
+    }
+
+    /**
+     * The bound that this request places on its query, in terms of the partitions' positions
+     * against its inputs.
+     */
+    public PositionBound getPositionBound() {
+        return position;
+    }
+
+    /**
+     * The query this request is meant to run.
+     */
+    public Query<R> getQuery() {
+        return query;
+    }
+
+    /**
+     * Whether this request should fetch from all locally available partitions.
+     */
+    public boolean isAllPartitions() {
+        return !partitions.isPresent();
+    }
+
+    /**
+     * If the request is for specific partitions, return the set of partitions to query.
+     *
+     * @throws IllegalStateException if this is a request for all partitions
+     */
+    public Set<Integer> getPartitions() {
+        if (!partitions.isPresent()) {
+            throw new IllegalStateException(
+                "Cannot list partitions of an 'all partitions' request");
+        } else {
+            return partitions.get();
+        }
+    }
+
+    /**
+     * Whether the request includes detailed execution information.
+     */
+    public boolean executionInfoEnabled() {
+        return executionInfoEnabled;
+    }
+
+    /**
+     * Whether this request requires the query to execute only on active partitions.
+     */
+    public boolean isRequireActive() {
+        return requireActive;
+    }
+
+    /**
+     * A progressive builder interface for creating {@code StoreQueryRequest}s.
+     */
+    public static class InStore {
+
+        private final String name;
+
+        private InStore(final String name) {
+            this.name = name;
+        }
+
+        /**
+         * Specifies the query to run on the specified store.
+         */
+        public <R> StateQueryRequest<R> withQuery(final Query<R> query) {
+            return new StateQueryRequest<>(
+                name, // name is already specified
+                PositionBound.unbounded(), // default: unbounded
+                Optional.empty(), // default: all partitions
+                query, // the query is specified
+                false, // default: no execution info
+                false // default: don't require active
+            );
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
new file mode 100644
index 0000000..06cc6b4
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The response object for interactive queries.
+ * This wraps the individual partition results, as well as
+ * metadata relating to the result as a whole.
+ * <p>
+ * @param <R> The type of the query result.
+ */
+@Evolving
+public class StateQueryResult<R> {
+    private final Map<Integer, QueryResult<R>> partitionResults = new HashMap<>();
+    private QueryResult<R> globalResult = null;
+
+    /**
+     * Set the result for a global store query.
+     * Used by Kafka Streams and available for tests.
+     */
+    public void setGlobalResult(final QueryResult<R> r) {
+        this.globalResult = r;
+    }
+
+    /**
+     * Set the result for a partitioned store query.
+     * Used by Kafka Streams and available for tests.
+     */
+    public void addResult(final int partition, final QueryResult<R> r) {
+        partitionResults.put(partition, r);
+    }
+
+
+    /**
+     * The query's result for each partition that executed the query.
+     * Empty for global store queries.
+     */
+    public Map<Integer, QueryResult<R>> getPartitionResults() {
+        return partitionResults;
+    }
+
+    /**
+     * For queries that are expected to match records in only one
+     * partition, returns the result.
+     *
+     * @throws IllegalArgumentException if the results are not for exactly one partition.
+     */
+    public QueryResult<R> getOnlyPartitionResult() {
+        final List<QueryResult<R>> nonempty =
+            partitionResults
+                .values()
+                .stream()
+                .filter(r -> r.getResult() != null)
+                .collect(Collectors.toList());
+
+        if (nonempty.size() != 1) {
+            throw new IllegalArgumentException(
+                "The query did not return exactly one partition result: " + partitionResults
+            );
+        } else {
+            return nonempty.get(0);
+        }
+    }
+
+    /**
+     * The query's result for global store queries. Is {@code null} for
+     * non-global (partitioned) store queries.
+     */
+    public QueryResult<R> getGlobalResult() {
+        return globalResult;
+    }
+
+    /**
+     * The position of the state store at the moment it executed the
+     * query. In conjunction
+     * with {@link StateQueryRequest#withPositionBound}, this can be
+     * used to achieve a good balance between consistency and
+     * availability in which repeated queries are guaranteed to
+     * advance in time while allowing reads to be served from any
+     * replica that is caught up to that caller's prior observations.
+     */
+    public Position getPosition() {
+        Position position = Position.emptyPosition();
+        for (final QueryResult<R> r : partitionResults.values()) {
+            position = position.merge(r.getPosition());
+        }
+        return position;
+    }
+
+    @Override
+    public String toString() {
+        return "StateQueryResult{" +
+            "partitionResults=" + partitionResults +
+            ", globalResult=" + globalResult +
+            '}';
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
index 23374f3..40f0062 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
-import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -37,8 +36,8 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.query.FailureReason;
-import org.apache.kafka.streams.query.InteractiveQueryRequest;
-import org.apache.kafka.streams.query.InteractiveQueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
 import org.apache.kafka.streams.query.Iterators;
 import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
@@ -47,8 +46,6 @@ import org.apache.kafka.streams.query.RawKeyQuery;
 import org.apache.kafka.streams.query.RawScanQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
@@ -76,7 +73,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.streams.query.InteractiveQueryRequest.inStore;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
 import static org.apache.kafka.streams.query.PositionBound.at;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -189,7 +186,7 @@ public class IQv2IntegrationTest {
             kafkaStreams.serdesForStore(CACHED_TABLE);
 
         final byte[] rawKey = serdes.rawKey(1);
-        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+        final StateQueryResult<byte[]> result = kafkaStreams.query(
             inStore(CACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey)));
 
         System.out.println("|||" + result);
@@ -211,7 +208,7 @@ public class IQv2IntegrationTest {
             kafkaStreams.serdesForStore(UNCACHED_TABLE);
 
         final byte[] rawKey = serdes.rawKey(1);
-        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+        final StateQueryResult<byte[]> result = kafkaStreams.query(
             inStore(UNCACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey)));
 
         System.out.println("|||" + result);
@@ -230,10 +227,10 @@ public class IQv2IntegrationTest {
     public void shouldQueryTypedKeyFromUncachedTable() {
         final Integer key = 1;
 
-        final InteractiveQueryRequest<ValueAndTimestamp<Integer>> query =
+        final StateQueryRequest<ValueAndTimestamp<Integer>> query =
             inStore(UNCACHED_TABLE).withQuery(KeyQuery.withKey(key));
 
-        final InteractiveQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query);
+        final StateQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query);
 
         final ValueAndTimestamp<Integer> value = result.getOnlyPartitionResult().getResult();
 
@@ -247,10 +244,10 @@ public class IQv2IntegrationTest {
     public void exampleKeyQueryIntoWindowStore() {
         final Windowed<Integer> key = new Windowed<>(1, new TimeWindow(0L, 99L));
 
-        final InteractiveQueryRequest<ValueAndTimestamp<Long>> query =
+        final StateQueryRequest<ValueAndTimestamp<Long>> query =
             inStore(UNCACHED_COUNTS_TABLE).withQuery(KeyQuery.withKey(key));
 
-        final InteractiveQueryResult<ValueAndTimestamp<Long>> result = kafkaStreams.query(query);
+        final StateQueryResult<ValueAndTimestamp<Long>> result = kafkaStreams.query(query);
 
         final ValueAndTimestamp<Long> value = result.getOnlyPartitionResult().getResult();
 
@@ -263,7 +260,7 @@ public class IQv2IntegrationTest {
         final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
             kafkaStreams.serdesForStore(UNCACHED_TABLE);
 
-        final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
+        final StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
             kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan()));
 
         System.out.println("|||" + scanResult);
@@ -294,7 +291,7 @@ public class IQv2IntegrationTest {
         final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
             kafkaStreams.serdesForStore(UNCACHED_TABLE);
 
-        final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
+        final StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
             kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan()));
 
         System.out.println("|||" + scanResult);
@@ -330,7 +327,7 @@ public class IQv2IntegrationTest {
             kafkaStreams.serdesForStore(UNCACHED_TABLE);
 
         final byte[] rawKey = serdes.rawKey(1);
-        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+        final StateQueryResult<byte[]> result = kafkaStreams.query(
             inStore(UNCACHED_TABLE)
                 .withQuery(RawKeyQuery.withKey(rawKey))
                 .withPositionBound(
@@ -361,7 +358,7 @@ public class IQv2IntegrationTest {
 
         final byte[] rawKey = serdes.rawKey(1);
         // intentionally setting the bound higher than the current position.
-        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+        final StateQueryResult<byte[]> result = kafkaStreams.query(
             inStore(UNCACHED_TABLE)
                 .withQuery(RawKeyQuery.withKey(rawKey))
                 .withPositionBound(
@@ -399,7 +396,7 @@ public class IQv2IntegrationTest {
         final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
             kafkaStreams.serdesForStore(UNCACHED_TABLE);
 
-        final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
+        final StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
             kafkaStreams.query(
                 inStore(UNCACHED_TABLE)
                     .withQuery(RawScanQuery.scan())
diff --git a/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
new file mode 100644
index 0000000..41175c2
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class PositionBoundTest {
+
+    @Test
+    public void shouldCopyPosition() {
+        final Position position = Position.emptyPosition();
+        final PositionBound positionBound = PositionBound.at(position);
+        position.withComponent("topic", 1, 2L);
+
+        assertThat(position.getTopics(), equalTo(mkSet("topic")));
+        assertThat(positionBound.position().getTopics(), empty());
+    }
+
+    @Test
+    public void unboundedShouldBeUnbounded() {
+        final PositionBound bound = PositionBound.unbounded();
+        assertTrue(bound.isUnbounded());
+    }
+
+    @Test
+    public void unboundedShouldThrowOnPosition() {
+        final PositionBound bound = PositionBound.unbounded();
+        assertThrows(IllegalArgumentException.class, bound::position);
+    }
+
+    @Test
+    public void shouldEqualPosition() {
+        final PositionBound bound1 = PositionBound.at(Position.emptyPosition());
+        final PositionBound bound2 = PositionBound.at(Position.emptyPosition());
+        assertEquals(bound1, bound2);
+    }
+
+    @Test
+    public void shouldEqualUnbounded() {
+        final PositionBound bound1 = PositionBound.unbounded();
+        final PositionBound bound2 = PositionBound.unbounded();
+        assertEquals(bound1, bound2);
+    }
+
+    @Test
+    public void shouldEqualSelf() {
+        final PositionBound bound1 = PositionBound.at(Position.emptyPosition());
+        assertEquals(bound1, bound1);
+    }
+
+    @Test
+    public void shouldNotEqualNull() {
+        final PositionBound bound1 = PositionBound.at(Position.emptyPosition());
+        assertNotEquals(bound1, null);
+    }
+
+    @Test
+    public void shouldNotHash() {
+        final PositionBound bound = PositionBound.at(Position.emptyPosition());
+        assertThrows(UnsupportedOperationException.class, bound::hashCode);
+
+        // going overboard...
+        final HashSet<PositionBound> set = new HashSet<>();
+        assertThrows(UnsupportedOperationException.class, () -> set.add(bound));
+
+        final HashMap<PositionBound, Integer> map = new HashMap<>();
+        assertThrows(UnsupportedOperationException.class, () -> map.put(bound, 5));
+    }
+}

[kafka] 01/03: POC of the KIP-796 IQv2 proposal

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-framework
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 3e308374fc1289c69e28acc0a8a96fa35c3d56fd
Author: John Roesler <vv...@apache.org>
AuthorDate: Thu Oct 14 12:53:11 2021 -0500

    POC of the KIP-796 IQv2 proposal
    
    fix test
    
    Prototype of using KeyQuery against the Windowed store
    
    updates
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 125 +++++-
 .../apache/kafka/streams/processor/StateStore.java |  25 ++
 .../kafka/streams/processor/StateStoreContext.java |   4 +-
 .../apache/kafka/streams/query/FailureReason.java  |  44 ++
 .../streams/query/InteractiveQueryRequest.java     | 157 +++++++
 .../streams/query/InteractiveQueryResult.java      |  72 ++++
 .../org/apache/kafka/streams/query/Iterators.java  |  82 ++++
 .../KeyQuery.java}                                 |  26 +-
 .../org/apache/kafka/streams/query/Position.java   | 192 +++++++++
 .../apache/kafka/streams/query/PositionBound.java  |  80 ++++
 .../Query.java}                                    |  16 +-
 .../apache/kafka/streams/query/QueryResult.java    | 150 +++++++
 .../RawKeyQuery.java}                              |  32 +-
 .../RawScanQuery.java}                             |  21 +-
 .../apache/kafka/streams/state/StateSerdes.java    |   9 +
 .../AbstractRocksDBSegmentedBytesStore.java        |  18 +
 .../state/internals/InMemoryKeyValueStore.java     |  12 +
 .../InMemoryTimeOrderedKeyValueBuffer.java         |  12 +
 .../streams/state/internals/MemoryLRUCache.java    |  12 +
 .../state/internals/MeteredKeyValueStore.java      |  41 ++
 .../state/internals/MeteredSessionStore.java       |   4 +
 .../state/internals/MeteredWindowStore.java        | 117 ++++--
 .../state/internals/QueryableStoreProvider.java    |   5 +
 .../streams/state/internals/RocksDBStore.java      |  42 ++
 .../streams/state/internals/StoreQueryUtils.java   | 122 ++++++
 .../internals/TimestampedKeyValueStoreBuilder.java |  15 +
 .../internals/TimestampedWindowStoreBuilder.java   |  15 +
 .../state/internals/ValueAndTimestampSerde.java    |  10 +
 .../WindowToTimestampedWindowByteStoreAdapter.java |  12 +
 .../streams/state/internals/WrappedStateStore.java |  18 +
 .../streams/integration/IQv2IntegrationTest.java   | 460 +++++++++++++++++++++
 .../streams/state/internals/RocksDBStoreTest.java  |   3 +
 32 files changed, 1867 insertions(+), 86 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bb0c40a..2356096 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
@@ -40,29 +41,38 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.StreamsNotStartedException;
 import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
 import org.apache.kafka.streams.errors.UnknownStateStoreException;
-import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
 import org.apache.kafka.streams.internals.metrics.ClientMetrics;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ClientUtils;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
-import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.InteractiveQueryRequest;
+import org.apache.kafka.streams.query.InteractiveQueryResult;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
+import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
+import org.apache.kafka.streams.state.internals.MeteredSessionStore;
+import org.apache.kafka.streams.state.internals.MeteredWindowStore;
 import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
 import org.slf4j.Logger;
@@ -78,17 +88,18 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -1716,4 +1727,112 @@ public class KafkaStreams implements AutoCloseable {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    public <K, V> StateSerdes<K, V> serdesForStore(final String storeName) {
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such store is registered in the topology."
+            );
+        }
+
+        // TODO this is a hack. We ought to be able to create the serdes independent of the
+        // TODO stores and cache them in the topology.
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            return getSerdes(store);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+                    final StateStore store = entry.getValue().getStore(storeName);
+                    if (store != null) {
+                        return getSerdes(store);
+                    }
+                }
+            }
+        }
+        // there may be no local copy of this store.
+        // This is the main reason I want to decouble serde
+        // creation from the store itself.
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <V, K> StateSerdes<K, V> getSerdes(final StateStore store) {
+        if (store instanceof MeteredKeyValueStore) {
+            return ((MeteredKeyValueStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredSessionStore) {
+            return ((MeteredSessionStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredWindowStore) {
+            return ((MeteredWindowStore<K, V>) store).serdes();
+        } else {
+            throw new IllegalArgumentException("Unknown store type: " + store);
+        }
+    }
+
+    @Evolving
+    public <R> InteractiveQueryResult<R> query(final InteractiveQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such store is registered in the topology."
+            );
+        }
+        final InteractiveQueryResult<R> result = new InteractiveQueryResult<>(new HashMap<>());
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.notActive(
+                                        state,
+                                        active,
+                                        partition
+                                    )
+                                );
+                            } else {
+                                final QueryResult<R> r = store.query(
+                                    request.getQuery(),
+                                    request.isRequireActive()
+                                        ? PositionBound.unbounded()
+                                        : request.getPositionBound(),
+                                    request.executionInfoEnabled()
+                                );
+                                result.addResult(partition, r);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return result;
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 76d1ab4..b1c480f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -19,6 +19,10 @@ package org.apache.kafka.streams.processor;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 
 /**
  * A storage engine for managing state maintained by a stream processor.
@@ -119,4 +123,25 @@ public interface StateStore {
      * @return {@code true} if the store is open
      */
     boolean isOpen();
+
+    /**
+     * Execute a query. Returns a QueryResult containing either result data or
+     * a failure.
+     * <p>
+     * If the store doesn't know how to handle the given query, the result
+     * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
+     * If the store couldn't satisfy the given position bound, the result
+     * will be a {@link FailureReason#NOT_UP_TO_BOUND}.
+     * @param query The query to execute
+     * @param positionBound The position the store must be at or past
+     * @param collectExecutionInfo Whether the store should collect detailed execution info for the query
+     * @param <R> The result type
+     */
+    default <R> QueryResult<R> query(
+        Query<R> query,
+        PositionBound positionBound,
+        boolean collectExecutionInfo) {
+
+        return QueryResult.forUnknownQueryType(query, this);
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
index 50d5879..f6f1446 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
@@ -46,8 +46,8 @@ public interface StateStoreContext {
 
     /**
      * Return the metadata of the current topic/partition/offset if available.
-     * This is defined as the metadata of the record that is currently been
-     * processed by the StreamTask that holds the store.
+     * This is defined as the metadata of the record that is currently being
+     * processed (or was last processed) by the StreamTask that holds the store.
      * <p>
      * Note that the metadata is not defined during all store interactions, for
      * example, while the StreamTask is running a punctuation.
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
new file mode 100644
index 0000000..02db7fe
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+public enum FailureReason {
+    /**
+     * Failure indicating that the store doesn't know how to handle the given query.
+     */
+    UNKNOWN_QUERY_TYPE,
+
+    /**
+     * Failure indicating that the store partition is not (yet) up to the desired bound.
+     * The caller should either try again later or try a different replica.
+     */
+    NOT_UP_TO_BOUND,
+
+    /**
+     * Failure indicating that the requested store partition is not present on the local
+     * KafkaStreams instance. It may have been migrated to another instance during a rebalance.
+     * The caller is recommended to try a different replica.
+     */
+    NOT_PRESENT,
+
+    /**
+     * The requested store partition does not exist at all. For example, partition 4 was requested,
+     * but the store in question only has 4 partitions (0 through 3).
+     */
+    DOES_NOT_EXIST;
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java
new file mode 100644
index 0000000..56fc0a8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * @param <R>
+ */
+public class InteractiveQueryRequest<R> {
+
+    private final String storeName;
+    private final PositionBound position;
+    private final Optional<Set<Integer>> partitions;
+    private final Query<R> query;
+    private boolean executionInfoEnabled;
+    private boolean requireActive;
+
+    private InteractiveQueryRequest(
+        final String storeName,
+        final PositionBound position,
+        final Optional<Set<Integer>> partitions,
+        final Query<R> query,
+        final boolean executionInfoEnabled, final boolean requireActive) {
+
+        this.storeName = storeName;
+        this.position = position;
+        this.partitions = partitions;
+        this.query = query;
+        this.executionInfoEnabled = executionInfoEnabled;
+        this.requireActive = requireActive;
+    }
+
+    public static InStore inStore(final String name) {
+        return new InStore(name);
+    }
+
+    public InteractiveQueryRequest<R> withPositionBound(final PositionBound positionBound) {
+        return new InteractiveQueryRequest<>(
+            storeName,
+            positionBound,
+            partitions,
+            query,
+            executionInfoEnabled,
+            false);
+    }
+
+
+    public InteractiveQueryRequest<R> withNoPartitions() {
+        return new InteractiveQueryRequest<>(storeName,
+            position,
+            Optional.of(Collections.emptySet()),
+            query,
+            executionInfoEnabled,
+            requireActive);
+    }
+
+    public InteractiveQueryRequest<R> withAllPartitions() {
+        return new InteractiveQueryRequest<>(storeName,
+            position,
+            Optional.empty(),
+            query,
+            executionInfoEnabled,
+            requireActive);
+    }
+
+    public InteractiveQueryRequest<R> withPartitions(final Set<Integer> partitions) {
+        return new InteractiveQueryRequest<>(storeName,
+            position,
+            Optional.of(Collections.unmodifiableSet(new HashSet<>(partitions))),
+            query,
+            executionInfoEnabled,
+            requireActive);
+    }
+
+    public String getStoreName() {
+        return storeName;
+    }
+
+    public PositionBound getPositionBound() {
+        if (requireActive) {
+            throw new IllegalArgumentException();
+        }
+        return Objects.requireNonNull(position);
+    }
+
+    public Query<R> getQuery() {
+        return query;
+    }
+
+    public boolean isAllPartitions() {
+        return !partitions.isPresent();
+    }
+
+    public Set<Integer> getPartitions() {
+        if (!partitions.isPresent()) {
+            throw new UnsupportedOperationException(
+                "Cannot list partitions of an 'all partitions' request");
+        } else {
+            return partitions.get();
+        }
+    }
+
+    public InteractiveQueryRequest<R> enableExecutionInfo() {
+        return new InteractiveQueryRequest<>(storeName,
+            position,
+            partitions,
+            query,
+            true,
+            requireActive);
+    }
+
+    public boolean executionInfoEnabled() {
+        return executionInfoEnabled;
+    }
+
+    public boolean isRequireActive() {
+        return requireActive;
+    }
+
+    public static class InStore {
+
+        private final String name;
+
+        private InStore(final String name) {
+            this.name = name;
+        }
+
+        public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query) {
+            return new InteractiveQueryRequest<>(
+                name,
+                null,
+                Optional.empty(),
+                query,
+                false,
+                true);
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java
new file mode 100644
index 0000000..4f8e728
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class InteractiveQueryResult<R> {
+
+    private final Map<Integer, QueryResult<R>> partitionResults;
+
+    public InteractiveQueryResult(final Map<Integer, QueryResult<R>> resultMap) {
+        partitionResults = resultMap;
+    }
+
+    public void setGlobalResult(final QueryResult<R> r) {
+
+    }
+
+    public void addResult(final int partition, final QueryResult<R> r) {
+        partitionResults.put(partition, r);
+    }
+
+    public Map<Integer, QueryResult<R>> getPartitionResults() {
+        return partitionResults;
+    }
+
+    public QueryResult<R> getOnlyPartitionResult() {
+        final List<QueryResult<R>> nonempty =
+            partitionResults
+                .values()
+                .stream()
+                .filter(r -> r.getResult() != null)
+                .collect(Collectors.toList());
+
+        if (nonempty.size() != 1) {
+            throw new IllegalStateException();
+        } else {
+            return nonempty.get(0);
+        }
+    }
+
+    public Position getPosition() {
+        Position position = Position.emptyPosition();
+        for (final QueryResult<R> r : partitionResults.values()) {
+            position = position.merge(r.getPosition());
+        }
+        return position;
+    }
+
+    @Override
+    public String toString() {
+        return "InteractiveQueryResult{" +
+            "partitionResults=" + partitionResults +
+            '}';
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Iterators.java b/streams/src/main/java/org/apache/kafka/streams/query/Iterators.java
new file mode 100644
index 0000000..d7ea7e9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Iterators.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.utils.CloseableIterator;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Objects;
+
+public final class Iterators {
+
+    private Iterators() {
+    }
+
+    public static <E, I extends Iterator<E>> CloseableIterator<E> collate(final Collection<I> iterators) {
+        return new CloseableIterator<E>() {
+            private final Deque<I> iteratorQueue = new LinkedList<>(iterators);
+
+            @Override
+            public void close() {
+                RuntimeException exception = null;
+                for (final I iterator : iterators) {
+                    if (iterator instanceof Closeable) {
+                        try {
+                            ((Closeable) iterator).close();
+                        } catch (final IOException e) {
+                            if (exception == null) {
+                                exception = new RuntimeException(
+                                    "Exception closing collated iterator", e);
+                            } else {
+                                exception.addSuppressed(e);
+                            }
+                        }
+                    }
+                }
+                if (exception != null) {
+                    throw exception;
+                }
+            }
+
+            @Override
+            public boolean hasNext() {
+                for (int i = 0; i < iterators.size(); i++) {
+                    final Iterator<E> iterator = Objects.requireNonNull(iteratorQueue.peek());
+                    if (iterator.hasNext()) {
+                        return true;
+                    } else {
+                        iteratorQueue.push(iteratorQueue.poll());
+                    }
+                }
+                return false;
+            }
+
+            @Override
+            public E next() {
+                final I iterator = iteratorQueue.poll();
+                final E next = Objects.requireNonNull(iterator).next();
+                iteratorQueue.push(iterator);
+                return next;
+            }
+        };
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
similarity index 56%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
copy to streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
index 1936d29..3900739 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
@@ -14,19 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.streams.query;
 
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+public class KeyQuery<K, V> implements Query<V> {
 
-import static java.util.Objects.requireNonNull;
+    private final K key;
 
-public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
-    public ValueAndTimestampSerde(final Serde<V> valueSerde) {
-        super(
-            new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
-            new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
-        );
+    private KeyQuery(final K key) {
+        this.key = key;
     }
-}
\ No newline at end of file
+
+    public static <K, V> KeyQuery<K, V> withKey(final K key) {
+        return new KeyQuery<>(key);
+    }
+
+    public K getKey() {
+        return key;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
new file mode 100644
index 0000000..5b0e981
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+
+public class Position {
+
+    private final Map<String, Map<Integer, Long>> position;
+
+    private Position(final Map<String, Map<Integer, Long>> position) {
+        this.position = position;
+    }
+
+    public static Position emptyPosition() {
+        return new Position(new HashMap<>());
+    }
+
+    public static Position fromMap(final Map<String, Map<Integer, Long>> map) {
+        return new Position(deepCopy(map));
+    }
+
+    public Position withComponent(final String topic, final int partition, final long offset) {
+        final Map<String, Map<Integer, Long>> updated = deepCopy(position);
+        updated.computeIfAbsent(topic, k -> new HashMap<>()).put(partition, offset);
+        return new Position(updated);
+    }
+
+    public Position merge(final Position other) {
+        if (other == null) {
+            return this;
+        } else {
+            final Map<String, Map<Integer, Long>> copy = deepCopy(position);
+            for (final Entry<String, Map<Integer, Long>> entry : other.position.entrySet()) {
+                final String topic = entry.getKey();
+                final Map<Integer, Long> partitionMap =
+                    copy.computeIfAbsent(topic, k -> new HashMap<>());
+                for (final Entry<Integer, Long> partitionOffset : entry.getValue().entrySet()) {
+                    final Integer partition = partitionOffset.getKey();
+                    final Long offset = partitionOffset.getValue();
+                    if (!partitionMap.containsKey(partition)
+                        || partitionMap.get(partition) < offset) {
+                        partitionMap.put(partition, offset);
+                    }
+                }
+            }
+            return new Position(copy);
+        }
+    }
+
+    public Set<String> getTopics() {
+        return Collections.unmodifiableSet(position.keySet());
+    }
+
+    public Map<Integer, Long> getBound(final String topic) {
+        return Collections.unmodifiableMap(position.get(topic));
+    }
+
+    public ByteBuffer serialize() {
+        final byte version = (byte) 0;
+
+        int arraySize = Byte.SIZE; // version
+
+        final int nTopics = position.size();
+        arraySize += Integer.SIZE;
+
+        final ArrayList<Entry<String, Map<Integer, Long>>> entries =
+            new ArrayList<>(position.entrySet());
+        final byte[][] topics = new byte[entries.size()][];
+
+        for (int i = 0; i < nTopics; i++) {
+            final Entry<String, Map<Integer, Long>> entry = entries.get(i);
+            final byte[] topicBytes = entry.getKey().getBytes(StandardCharsets.UTF_8);
+            topics[i] = topicBytes;
+            arraySize += Integer.SIZE; // topic name length
+            arraySize += topicBytes.length; // topic name itself
+
+            final Map<Integer, Long> partitionOffsets = entry.getValue();
+            arraySize += Integer.SIZE; // Number of PartitionOffset pairs
+            arraySize += (Integer.SIZE + Long.SIZE)
+                * partitionOffsets.size(); // partitionOffsets themselves
+        }
+
+        final ByteBuffer buffer = ByteBuffer.allocate(arraySize);
+        buffer.put(version);
+
+        buffer.putInt(nTopics);
+        for (int i = 0; i < nTopics; i++) {
+            buffer.putInt(topics[i].length);
+            buffer.put(topics[i]);
+
+            final Entry<String, Map<Integer, Long>> entry = entries.get(i);
+            final Map<Integer, Long> partitionOffsets = entry.getValue();
+            buffer.putInt(partitionOffsets.size());
+            for (final Entry<Integer, Long> partitionOffset : partitionOffsets.entrySet()) {
+                buffer.putInt(partitionOffset.getKey());
+                buffer.putLong(partitionOffset.getValue());
+            }
+        }
+
+        buffer.flip();
+        return buffer;
+    }
+
+    public static Position deserialize(final ByteBuffer buffer) {
+        final byte version = buffer.get();
+
+        switch (version) {
+            case (byte) 0:
+                final int nTopics = buffer.getInt();
+                final Map<String, Map<Integer, Long>> position = new HashMap<>(nTopics);
+                for (int i = 0; i < nTopics; i++) {
+                    final int topicNameLength = buffer.getInt();
+                    final byte[] topicNameBytes = new byte[topicNameLength];
+                    buffer.get(topicNameBytes);
+                    final String topic = new String(topicNameBytes, StandardCharsets.UTF_8);
+
+                    final int numPairs = buffer.getInt();
+                    final Map<Integer, Long> partitionOffsets = new HashMap<>(numPairs);
+                    for (int j = 0; j < numPairs; j++) {
+                        partitionOffsets.put(buffer.getInt(), buffer.getLong());
+                    }
+                    position.put(topic, partitionOffsets);
+                }
+                return Position.fromMap(position);
+            default:
+                throw new IllegalArgumentException(
+                    "Unknown version " + version + " when deserializing Position"
+                );
+        }
+    }
+
+    private static Map<String, Map<Integer, Long>> deepCopy(
+        final Map<String, Map<Integer, Long>> map) {
+        if (map == null) {
+            return new HashMap<>();
+        } else {
+            final Map<String, Map<Integer, Long>> copy = new HashMap<>(map.size());
+            for (final Entry<String, Map<Integer, Long>> entry : map.entrySet()) {
+                copy.put(entry.getKey(), new HashMap<>(entry.getValue()));
+            }
+            return copy;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "Position{" +
+            "position=" + position +
+            '}';
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final Position position1 = (Position) o;
+        return Objects.equals(position, position1.position);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(position);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
new file mode 100644
index 0000000..a4dbe35
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import java.util.Objects;
+
+public class PositionBound {
+
+    private final Position position;
+    private final boolean unbounded;
+
+    private PositionBound(final Position position, final boolean unbounded) {
+        if (unbounded && position != null) {
+            throw new IllegalArgumentException();
+        }
+        this.position = position;
+        this.unbounded = unbounded;
+    }
+
+    public static PositionBound unbounded() {
+        return new PositionBound(null, true);
+    }
+
+    public static PositionBound at(final Position position) {
+        return new PositionBound(position, false);
+    }
+
+    public boolean isUnbounded() {
+        return unbounded;
+    }
+
+    public Position position() {
+        if (unbounded) {
+            throw new IllegalArgumentException();
+        } else {
+            return position;
+        }
+    }
+
+    @Override
+    public String toString() {
+        if (isUnbounded()) {
+            return "PositionBound{unbounded}";
+        } else {
+            return "PositionBound{position=" + position + '}';
+        }
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final PositionBound that = (PositionBound) o;
+        return unbounded == that.unbounded && Objects.equals(position, that.position);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(position, unbounded);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/Query.java
similarity index 53%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
copy to streams/src/main/java/org/apache/kafka/streams/query/Query.java
index 1936d29..988f904 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Query.java
@@ -14,19 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.streams.query;
 
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
 
-import static java.util.Objects.requireNonNull;
+public interface Query<R> {
 
-public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
-    public ValueAndTimestampSerde(final Serde<V> valueSerde) {
-        super(
-            new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
-            new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
-        );
-    }
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
new file mode 100644
index 0000000..f3b92d6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StreamThread.State;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public final class QueryResult<R> {
+
+    private final List<String> executionInfo = new LinkedList<>();
+    private final FailureReason failureReason;
+    private final String failure;
+    private final R result;
+    private Position boundUpdate;
+
+    private QueryResult(final R result) {
+        this.result = result;
+        this.failureReason = null;
+        this.failure = null;
+    }
+
+    private QueryResult(final FailureReason failureReason, final String failure) {
+        this.result = null;
+        this.failureReason = failureReason;
+        this.failure = failure;
+    }
+
+    public static <R> QueryResult<R> forResult(final R result) {
+        return new QueryResult<>(result);
+    }
+
+    public static <R> QueryResult<R> forUnknownQueryType(
+        final Query<R> query,
+        final StateStore store) {
+
+        return new QueryResult<>(
+            FailureReason.UNKNOWN_QUERY_TYPE,
+            "This store (" + store.getClass() + ") doesn't know how to execute "
+                + "the given query (" + query + ")." +
+                " Contact the store maintainer if you need support for a new query type.");
+    }
+
+    public static <R> QueryResult<R> notUpToBound(
+        final Position currentPosition,
+        final PositionBound positionBound,
+        final int partition) {
+
+        return new QueryResult<>(
+            FailureReason.NOT_UP_TO_BOUND,
+            "For store partition " + partition + ", the current position "
+                + currentPosition + " is not yet up to the bound "
+                + positionBound
+        );
+    }
+
+    public static <R> QueryResult<R> notActive(
+        final State state,
+        final boolean active,
+        final int partition) {
+        return new QueryResult<>(
+            FailureReason.NOT_UP_TO_BOUND,
+            "Query requires a running active task,"
+                + " but partition " + partition + " was in state " + state + " and was "
+                + (active ? "active" : "not active") + "."
+        );
+    }
+
+
+    public <NewR> QueryResult<NewR> swapResult(final NewR typedResult) {
+        final QueryResult<NewR> queryResult = new QueryResult<>(typedResult);
+        queryResult.executionInfo.addAll(executionInfo);
+        queryResult.boundUpdate = boundUpdate;
+        return queryResult;
+    }
+
+    public void addExecutionInfo(final String s) {
+        executionInfo.add(s);
+    }
+
+    public void throwIfFailure() {
+        if (isFailure()) {
+            throw new RuntimeException(failureReason.name() + ": " + failure);
+        }
+    }
+
+    public boolean isSuccess() {
+        return failureReason == null;
+    }
+
+    public boolean isFailure() {
+        return failureReason != null;
+    }
+
+    public List<String> getExecutionInfo() {
+        return executionInfo;
+    }
+
+    public FailureReason getFailureReason() {
+        return failureReason;
+    }
+
+    public String getFailure() {
+        return failure;
+    }
+
+    public R getResult() {
+        if (result == null) {
+            throwIfFailure();
+        }
+        // will return `null` if there's not a failure recorded.
+        return result;
+    }
+
+    public void setPosition(final Position boundUpdate) {
+        this.boundUpdate = boundUpdate;
+    }
+
+    public Position getPosition() {
+        return boundUpdate;
+    }
+
+    @Override
+    public String toString() {
+        return "QueryResult{" +
+            "executionInfo=" + executionInfo +
+            ", failureReason=" + failureReason +
+            ", failure='" + failure + '\'' +
+            ", result=" + result +
+            ", boundUpdate=" + boundUpdate +
+            '}';
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java
similarity index 53%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
copy to streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java
index 1936d29..c42762e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java
@@ -14,19 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.streams.query;
 
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.common.utils.Bytes;
 
-import static java.util.Objects.requireNonNull;
+public class RawKeyQuery implements Query<byte[]> {
 
-public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
-    public ValueAndTimestampSerde(final Serde<V> valueSerde) {
-        super(
-            new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
-            new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
-        );
+    private final Bytes key;
+
+    private RawKeyQuery(final Bytes key) {
+        this.key = key;
+    }
+
+    public static RawKeyQuery withKey(final Bytes key) {
+        return new RawKeyQuery(key);
+    }
+
+    public static RawKeyQuery withKey(final byte[] key) {
+        return new RawKeyQuery(Bytes.wrap(key));
+    }
+
+    public Bytes getKey() {
+        return key;
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/RawScanQuery.java
similarity index 53%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
copy to streams/src/main/java/org/apache/kafka/streams/query/RawScanQuery.java
index 1936d29..d1f14f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/RawScanQuery.java
@@ -14,19 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.streams.query;
 
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
 
-import static java.util.Objects.requireNonNull;
+public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
 
-public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
-    public ValueAndTimestampSerde(final Serde<V> valueSerde) {
-        super(
-            new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
-            new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
-        );
+    private RawScanQuery() {}
+
+    public static RawScanQuery scan() {
+        return new RawScanQuery();
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index f9f0bdc..da7927e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -212,4 +212,13 @@ public final class StateSerdes<K, V> {
                     e);
         }
     }
+
+    @Override
+    public String toString() {
+        return "StateSerdes{" +
+            "topic='" + topic + '\'' +
+            ", keySerde=" + keySerde +
+            ", valueSerde=" + valueSerde +
+            '}';
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index bfee6b2..c1f7461 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -27,6 +27,10 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
@@ -279,6 +283,20 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
         return open;
     }
 
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+        if (query instanceof RawKeyQuery) {
+            final Bytes key = ((RawKeyQuery) query).getKey();
+            final byte[] bytes = get(key);
+            return QueryResult.forResult((R) bytes);
+        } else {
+            return QueryResult.forUnknownQueryType(query, this);
+        }
+    }
+
     // Visible for testing
     List<S> getSegments() {
         return segments.allSegments(false);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index f0c6dbe..f9bc34e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -21,6 +21,9 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.slf4j.Logger;
@@ -73,6 +76,15 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     }
 
     @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        return StoreQueryUtils.requireKVQuery(query, this, collectExecutionInfo);
+    }
+
+    @Override
     public synchronized byte[] get(final Bytes key) {
         return map.get(key);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index ba8a745..e13b357 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -40,6 +40,9 @@ import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordQueue;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.DeserializationResult;
@@ -240,6 +243,15 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
     }
 
     @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        return QueryResult.forUnknownQueryType(query, this);
+    }
+
+    @Override
     public void close() {
         open = false;
         index.clear();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 22f1215..1bea349 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -22,6 +22,9 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -108,6 +111,15 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
     }
 
     @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        return StoreQueryUtils.requireKVQuery(query, this, collectExecutionInfo);
+    }
+
+    @Override
     public synchronized byte[] get(final Bytes key) {
         Objects.requireNonNull(key);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 937288c..21b8e38 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -34,6 +34,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -186,6 +191,38 @@ public class MeteredKeyValueStore<K, V>
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        if (query instanceof KeyQuery) {
+            final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+            final RawKeyQuery rawKeyQuery = RawKeyQuery.withKey(keyBytes(typedQuery.getKey()));
+            final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+            if (rawResult.isSuccess()) {
+                final V value = outerValue(rawResult.getResult());
+                final QueryResult<V> typedQueryResult =
+                    rawResult.swapResult(value);
+                result = (QueryResult<R>) typedQueryResult;
+            } else {
+                // the generic type doesn't matter, since failed queries have no result set.
+                result = (QueryResult<R>) rawResult;
+            }
+        } else {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+        }
+        final long end = System.nanoTime();
+        result.addExecutionInfo(
+            "Handled in " + getClass() + " with serdes " + serdes + " in " + (end - start) + "ns");
+        return result;
+    }
+
     @Override
     public V get(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
@@ -324,6 +361,10 @@ public class MeteredKeyValueStore<K, V>
         }
     }
 
+    public StateSerdes<K, V> serdes() {
+        return serdes;
+    }
+
     private class MeteredKeyValueIterator implements KeyValueIterator<K, V> {
 
         private final KeyValueIterator<Bytes, byte[]> iter;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index b1eb948..87b3b40 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -372,4 +372,8 @@ public class MeteredSessionStore<K, V>
             e2eLatencySensor.record(e2eLatency, currentTime);
         }
     }
+
+    public StateSerdes<K, V> serdes() {
+        return serdes;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 0970703..1f267bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -33,6 +33,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
@@ -63,11 +68,11 @@ public class MeteredWindowStore<K, V>
     private TaskId taskId;
 
     MeteredWindowStore(final WindowStore<Bytes, byte[]> inner,
-                       final long windowSizeMs,
-                       final String metricsScope,
-                       final Time time,
-                       final Serde<K> keySerde,
-                       final Serde<V> valueSerde) {
+        final long windowSizeMs,
+        final String metricsScope,
+        final Time time,
+        final Serde<K> keySerde,
+        final Serde<V> valueSerde) {
         super(inner);
         this.windowSizeMs = windowSizeMs;
         this.metricsScope = metricsScope;
@@ -87,7 +92,8 @@ public class MeteredWindowStore<K, V>
 
         registerMetrics();
         final Sensor restoreSensor =
-            StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
+            StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(),
+                streamsMetrics);
 
         // register and possibly restore the state from the logs
         maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
@@ -103,20 +109,26 @@ public class MeteredWindowStore<K, V>
 
         registerMetrics();
         final Sensor restoreSensor =
-            StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
+            StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(),
+                streamsMetrics);
 
         // register and possibly restore the state from the logs
         maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
     }
+
     protected Serde<V> prepareValueSerde(final Serde<V> valueSerde, final SerdeGetter getter) {
         return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
     }
 
     private void registerMetrics() {
-        putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
-        fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
-        flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
-        e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
+        putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(),
+            streamsMetrics);
+        fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(),
+            streamsMetrics);
+        flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(),
+            streamsMetrics);
+        e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope,
+            name(), streamsMetrics);
     }
 
     @Deprecated
@@ -126,7 +138,8 @@ public class MeteredWindowStore<K, V>
         serdes = new StateSerdes<>(
             changelogTopic != null ?
                 changelogTopic :
-                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()),
+                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName,
+                    taskId.topologyName()),
             prepareKeySerde(keySerde, new SerdeGetter(context)),
             prepareValueSerde(valueSerde, new SerdeGetter(context)));
     }
@@ -137,7 +150,8 @@ public class MeteredWindowStore<K, V>
         serdes = new StateSerdes<>(
             changelogTopic != null ?
                 changelogTopic :
-                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()),
+                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName,
+                    taskId.topologyName()),
             prepareKeySerde(keySerde, new SerdeGetter(context)),
             prepareValueSerde(valueSerde, new SerdeGetter(context)));
     }
@@ -145,7 +159,7 @@ public class MeteredWindowStore<K, V>
     @SuppressWarnings("unchecked")
     @Override
     public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener,
-                                    final boolean sendOldValues) {
+        final boolean sendOldValues) {
         final WindowStore<Bytes, byte[]> wrapped = wrapped();
         if (wrapped instanceof CachedStateStore) {
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
@@ -161,10 +175,43 @@ public class MeteredWindowStore<K, V>
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo
+    ) {
+        if (query instanceof KeyQuery) {
+            final Windowed<K> key = ((KeyQuery<Windowed<K>, V>) query).getKey();
+            final Bytes bytes = keyBytes(key.key());
+            // NOTE: we need to _fully_ serialize the key, since we can't pass in any
+            // extra timestamp information in the RawKeyQuery. So, we go ahead and use the
+            // internal store's binary schema. This works for the provided KS windowed stores,
+            // but a custom store that uses a different schema will have to use the WindowKeySchema
+            // to read the data back out of the array and convert it to whatever the real binary
+            // key is.
+            // seqnum hard-coded to zero since we don't query stream-stream join stores.
+            final Bytes storeKey = WindowKeySchema.toStoreKeyBinary(
+                bytes,
+                key.window().start(),
+                0
+            );
+            final RawKeyQuery rawKeyQuery = RawKeyQuery.withKey(storeKey);
+            final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+            final V v = serdes.valueFrom(rawResult.getResult());
+            final QueryResult<V> result = rawResult.swapResult(v);
+            return (QueryResult<R>) result;
+        } else {
+            return super.query(query, positionBound, collectExecutionInfo);
+        }
+    }
+
     @Override
     public void put(final K key,
-                    final V value,
-                    final long windowStartTimestamp) {
+        final V value,
+        final long windowStartTimestamp) {
         Objects.requireNonNull(key, "key cannot be null");
         try {
             maybeMeasureLatency(
@@ -181,7 +228,7 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public V fetch(final K key,
-                   final long timestamp) {
+        final long timestamp) {
         Objects.requireNonNull(key, "key cannot be null");
         return maybeMeasureLatency(
             () -> {
@@ -198,8 +245,8 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public WindowStoreIterator<V> fetch(final K key,
-                                        final long timeFrom,
-                                        final long timeTo) {
+        final long timeFrom,
+        final long timeTo) {
         Objects.requireNonNull(key, "key cannot be null");
         return new MeteredWindowStoreIterator<>(
             wrapped().fetch(keyBytes(key), timeFrom, timeTo),
@@ -212,8 +259,8 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public WindowStoreIterator<V> backwardFetch(final K key,
-                                                final long timeFrom,
-                                                final long timeTo) {
+        final long timeFrom,
+        final long timeTo) {
         Objects.requireNonNull(key, "key cannot be null");
         return new MeteredWindowStoreIterator<>(
             wrapped().backwardFetch(keyBytes(key), timeFrom, timeTo),
@@ -226,9 +273,9 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
-                                                  final K keyTo,
-                                                  final long timeFrom,
-                                                  final long timeTo) {
+        final K keyTo,
+        final long timeFrom,
+        final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(
             wrapped().fetch(
                 keyBytes(keyFrom),
@@ -243,9 +290,9 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
-                                                          final K keyTo,
-                                                          final long timeFrom,
-                                                          final long timeTo) {
+        final K keyTo,
+        final long timeFrom,
+        final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(
             wrapped().backwardFetch(
                 keyBytes(keyFrom),
@@ -260,7 +307,7 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
-                                                     final long timeTo) {
+        final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(
             wrapped().fetchAll(timeFrom, timeTo),
             fetchSensor,
@@ -271,7 +318,7 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long timeFrom,
-                                                             final long timeTo) {
+        final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(
             wrapped().backwardFetchAll(timeFrom, timeTo),
             fetchSensor,
@@ -282,12 +329,14 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public KeyValueIterator<Windowed<K>, V> all() {
-        return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchSensor, streamsMetrics, serdes, time);
+        return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchSensor, streamsMetrics,
+            serdes, time);
     }
 
     @Override
     public KeyValueIterator<Windowed<K>, V> backwardAll() {
-        return new MeteredWindowedKeyValueIterator<>(wrapped().backwardAll(), fetchSensor, streamsMetrics, serdes, time);
+        return new MeteredWindowedKeyValueIterator<>(wrapped().backwardAll(), fetchSensor,
+            streamsMetrics, serdes, time);
     }
 
     @Override
@@ -313,8 +362,12 @@ public class MeteredWindowStore<K, V>
         // In that case, we _can't_ get the current timestamp, so we don't record anything.
         if (e2eLatencySensor.shouldRecord() && context != null) {
             final long currentTime = time.milliseconds();
-            final long e2eLatency =  currentTime - context.timestamp();
+            final long e2eLatency = currentTime - context.timestamp();
             e2eLatencySensor.record(e2eLatency, currentTime);
         }
     }
+
+    public StateSerdes<K, V> serdes() {
+        return serdes;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 07cf0ee..aedb67c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -20,6 +20,7 @@ import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -71,4 +72,8 @@ public class QueryableStoreProvider {
     public void removeStoreProviderForThread(final String threadName) {
         this.storeProviders.remove(threadName);
     }
+
+    public Collection<StreamThreadStateStoreProvider> getStoreProviders() {
+        return storeProviders.values();
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index aa1b1ba..e9ea595 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -28,6 +28,11 @@ import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -61,6 +66,7 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -105,6 +111,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
     private final RocksDBMetricsRecorder metricsRecorder;
 
     protected volatile boolean open = false;
+    private StateStoreContext context;
+    private Map<String, Map<Integer, Long>> seenOffsets = new HashMap<>();
 
     RocksDBStore(final String name,
                  final String metricsScope) {
@@ -252,6 +260,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
         context.register(root, new RocksDBBatchingRestoreCallback(this));
+        this.context = context;
     }
 
     @Override
@@ -269,6 +278,29 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
         return open;
     }
 
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        if (context == null) {
+            throw new IllegalStateException("Store is not yet initialized");
+        } else {
+            final int partition = this.context.taskId().partition();
+            if (StoreQueryUtils.isPermitted(seenOffsets, positionBound, partition)) {
+                final QueryResult<R> result = StoreQueryUtils.requireKVQuery(query, this,
+                    collectExecutionInfo);
+                final Position currentPosition = Position.fromMap(seenOffsets);
+                result.setPosition(currentPosition);
+                return result;
+            } else {
+                return QueryResult.notUpToBound(Position.fromMap(seenOffsets), positionBound,
+                    partition);
+            }
+        }
+    }
+
     private void validateStoreOpen() {
         if (!open) {
             throw new InvalidStateStoreException("Store " + name + " is currently closed");
@@ -281,6 +313,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
         Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
         dbAccessor.put(key.get(), value);
+        // FIXME record metadata can be null because when this store is used as a Segment,
+        // we never call init(). Is that correct?
+        // to make this logic work properly for segmented stores, we either need to
+        // track the seen offsets one level up (in the RocksDBSegmentedBytesStore) OR
+        // we need to get a reference to the context here.
+        if (context != null && context.recordMetadata().isPresent()) {
+            final RecordMetadata meta = context.recordMetadata().get();
+            seenOffsets.computeIfAbsent(meta.topic(), t -> new HashMap<>())
+                .put(meta.partition(), meta.offset());
+        }
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
new file mode 100644
index 0000000..603257c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.Map;
+
+public final class StoreQueryUtils {
+
+    // make this class uninstantiable
+    private StoreQueryUtils() {
+    }
+
+    public static <R> QueryResult<R> requireKVQuery(
+        final Query<R> query,
+        final KeyValueStore<Bytes, byte[]> kvStore,
+        final boolean enableExecutionInfo) {
+        final QueryResult<R> r = StoreQueryUtils.handleKVQuery(query, kvStore, enableExecutionInfo);
+        r.throwIfFailure();
+        return r;
+    }
+
+    public static <R> QueryResult<R> handleKVQuery(
+        final Query<R> query,
+        final KeyValueStore<Bytes, byte[]> kvStore,
+        final boolean enableExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final String name = query.getClass().getCanonicalName();
+        switch (name) {
+            case "org.apache.kafka.streams.query.RawKeyQuery": {
+                final RawKeyQuery keyQuery = (RawKeyQuery) query;
+                return handleRawKeyQuery(kvStore, enableExecutionInfo, start, keyQuery);
+            }
+            case "org.apache.kafka.streams.query.RawScanQuery": {
+                final KeyValueIterator<Bytes, byte[]> iterator = kvStore.all();
+                @SuppressWarnings("unchecked") final R result = (R) iterator;
+                final long end = System.nanoTime();
+                final QueryResult<R> queryResult = QueryResult.forResult(result);
+                if (enableExecutionInfo) {
+                    queryResult.addExecutionInfo("Handled on " + kvStore.getClass().getName()
+                        + "#all via StoreQueryAdapters" + " in " + (end - start) + "ns");
+                }
+                return queryResult;
+            }
+            default:
+                return QueryResult.forUnknownQueryType(query, kvStore);
+        }
+    }
+
+    public static <R> QueryResult<R> handleRawKeyQuery(
+        final KeyValueStore<Bytes, byte[]> kvStore,
+        final boolean enableExecutionInfo,
+        final long start,
+        final RawKeyQuery keyQuery) {
+
+        final Bytes key = keyQuery.getKey();
+        final byte[] value = kvStore.get(key);
+        @SuppressWarnings("unchecked") final R result = (R) value;
+        final long end = System.nanoTime();
+
+        final QueryResult<R> queryResult = QueryResult.forResult(result);
+        if (enableExecutionInfo) {
+            queryResult.addExecutionInfo("Handled on " + kvStore.getClass().getName()
+                + "#get via StoreQueryAdapters" + " in " + (end - start) + "ns");
+        }
+        return queryResult;
+    }
+
+    public static boolean isPermitted(
+        final Map<String, Map<Integer, Long>> seenOffsets,
+        final PositionBound positionBound,
+        final int partition) {
+        if (positionBound.isUnbounded()) {
+            return true;
+        } else {
+            final Position position = positionBound.position();
+            for (final String topic : position.getTopics()) {
+                final Map<Integer, Long> partitionBounds = position.getBound(topic);
+                final Map<Integer, Long> seenPartitionBounds = seenOffsets.get(topic);
+                if (!partitionBounds.containsKey(partition)) {
+                    // this topic isn't bounded for our partition, so just skip over it.
+                } else {
+                    if (seenPartitionBounds == null) {
+                        // we haven't seen a topic that is bounded for our partition
+                        return false;
+                    } else if (!seenPartitionBounds.containsKey(partition)) {
+                        // we haven't seen a partition that we have a bound for
+                        return false;
+                    } else if (seenPartitionBounds.get(partition) < partitionBounds.get(
+                        partition)) {
+                        // our current position is behind the bound
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index a249a14..f4eb55e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -24,6 +24,9 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -185,6 +188,18 @@ public class TimestampedKeyValueStoreBuilder<K, V>
         }
 
         @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo) {
+
+            final long start = System.nanoTime();
+            final QueryResult<R> result = wrapped.query(query, positionBound, collectExecutionInfo);
+            final long end = System.nanoTime();
+            result.addExecutionInfo("Handled in " + getClass() + " in " + (end - start) + "ns");
+            return result;
+        }
+
+        @Override
         public String name() {
             return wrapped.name();
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index b3727f5..9b28ef5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -23,6 +23,9 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -203,6 +206,18 @@ public class TimestampedWindowStoreBuilder<K, V>
         }
 
         @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo) {
+
+            final long start = System.nanoTime();
+            final QueryResult<R> result = wrapped.query(query, positionBound, collectExecutionInfo);
+            final long end = System.nanoTime();
+            result.addExecutionInfo("Handled in " + getClass() + " in " + (end - start) + "ns");
+            return result;
+        }
+
+        @Override
         public String name() {
             return wrapped.name();
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
index 1936d29..599b6e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
@@ -23,10 +23,20 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 import static java.util.Objects.requireNonNull;
 
 public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+    private final Serde<V> valueSerde;
+
     public ValueAndTimestampSerde(final Serde<V> valueSerde) {
         super(
             new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
             new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
         );
+        this.valueSerde = valueSerde;
+    }
+
+    @Override
+    public String toString() {
+        return "ValueAndTimestampSerde{" +
+            "valueSerde=" + valueSerde +
+            '}';
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index f7999d3..2b626cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -21,6 +21,9 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -183,6 +186,15 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
         return store.isOpen();
     }
 
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        return QueryResult.forUnknownQueryType(query, this);
+    }
+
 
     private static class WindowToTimestampedWindowIteratorAdapter
         extends KeyValueToTimestampedKeyValueIteratorAdapter<Long>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index e8244f7..e904a48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -20,6 +20,9 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 
 /**
@@ -103,6 +106,21 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S
         wrapped.close();
     }
 
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+        final long start = System.nanoTime();
+        final QueryResult<R> result = wrapped().query(query, positionBound, collectExecutionInfo);
+        if (collectExecutionInfo) {
+            final long end = System.nanoTime();
+            result.addExecutionInfo(
+                "Handled in " + getClass() + " via WrappedStateStore" + " in " + (end - start)
+                    + "ns");
+        }
+        return result;
+    }
+
     public S wrapped() {
         return wrapped;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
new file mode 100644
index 0000000..23374f3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.InteractiveQueryRequest;
+import org.apache.kafka.streams.query.InteractiveQueryResult;
+import org.apache.kafka.streams.query.Iterators;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
+import org.apache.kafka.streams.query.RawScanQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.query.InteractiveQueryRequest.inStore;
+import static org.apache.kafka.streams.query.PositionBound.at;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThrows;
+
+@Category({IntegrationTest.class})
+public class IQv2IntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final String INPUT2_TOPIC_NAME = "input2-topic";
+    private static final String UNCACHED_TABLE = "uncached-table";
+    private static final String UNCACHED_COUNTS_TABLE = "uncached-counts-table";
+    private static final String CACHED_TABLE = "cached-table";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+    private static KafkaStreams kafkaStreams;
+
+    @BeforeClass
+    public static void before() throws InterruptedException, IOException {
+        CLUSTER.start();
+        CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, 1);
+        CLUSTER.createTopic(INPUT2_TOPIC_NAME, 2, 1);
+
+        final Semaphore semaphore = new Semaphore(0);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder
+            .table(
+                INPUT_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(UNCACHED_TABLE)
+                    .withCachingDisabled()
+            )
+            .filter(
+                (a, b) -> true,
+                Materialized.as(CACHED_TABLE)
+            )
+            .toStream()
+            .peek((k, v) -> semaphore.release());
+
+        builder
+            .stream(
+                INPUT2_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer())
+            )
+            .groupByKey()
+            .windowedBy(TimeWindows.ofSizeAndGrace(
+                Duration.ofMillis(100),
+                Duration.ZERO
+            ))
+            .count(
+                Materialized
+                    .<Integer, Long, WindowStore<Bytes, byte[]>>as(UNCACHED_COUNTS_TABLE)
+                    .withCachingDisabled()
+            )
+            .toStream()
+            .peek((k, v) -> semaphore.release());
+
+        kafkaStreams =
+            IntegrationTestUtils.getRunningStreams(streamsConfiguration(), builder, true);
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            INPUT_TOPIC_NAME,
+            Arrays.asList(new KeyValue<>(1, 1), new KeyValue<>(2, 2), new KeyValue<>(3, 3)),
+            producerProps,
+            Time.SYSTEM
+        );
+        // Assert that all messages in the first batch were processed in a timely manner
+        assertThat(semaphore.tryAcquire(3, 60, TimeUnit.SECONDS), is(equalTo(true)));
+
+        IntegrationTestUtils.produceSynchronously(
+            producerProps,
+            false,
+            INPUT2_TOPIC_NAME,
+            Optional.empty(),
+            Arrays.asList(
+                new KeyValueTimestamp<>(1, 1, 0),
+                new KeyValueTimestamp<>(1, 1, 10)
+            )
+        );
+
+        // Assert that we processed the second batch (should see both updates, since caching is disabled)
+        assertThat(semaphore.tryAcquire(2, 60, TimeUnit.SECONDS), is(equalTo(true)));
+
+
+    }
+
+    @AfterClass
+    public static void after() {
+        kafkaStreams.close(Duration.of(1, ChronoUnit.MINUTES));
+        kafkaStreams.cleanUp();
+        CLUSTER.stop();
+    }
+
+    @Test
+    public void shouldQueryKeyFromCachedTable() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(CACHED_TABLE);
+
+        final byte[] rawKey = serdes.rawKey(1);
+        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+            inStore(CACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey)));
+
+        System.out.println("|||" + result);
+        final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0);
+        final ValueAndTimestamp<Integer> value =
+            serdes.valueFrom(rawValueResult.getResult());
+        System.out.println("|||" + value);
+
+        assertThat(value.value(), is(1));
+        assertThat(result.getPosition(),
+            is(Position.fromMap(
+                mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+    }
+
+    @Test
+    public void shouldQueryKeyFromUncachedTable() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+        final byte[] rawKey = serdes.rawKey(1);
+        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+            inStore(UNCACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey)));
+
+        System.out.println("|||" + result);
+        final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0);
+        final ValueAndTimestamp<Integer> value =
+            serdes.valueFrom(rawValueResult.getResult());
+        System.out.println("|||" + value);
+
+        assertThat(value.value(), is(1));
+        assertThat(result.getPosition(),
+            is(Position.fromMap(
+                mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+    }
+
+    @Test
+    public void shouldQueryTypedKeyFromUncachedTable() {
+        final Integer key = 1;
+
+        final InteractiveQueryRequest<ValueAndTimestamp<Integer>> query =
+            inStore(UNCACHED_TABLE).withQuery(KeyQuery.withKey(key));
+
+        final InteractiveQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query);
+
+        final ValueAndTimestamp<Integer> value = result.getOnlyPartitionResult().getResult();
+
+        assertThat(value.value(), is(1));
+        assertThat(result.getPosition(),
+            is(Position.fromMap(
+                mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+    }
+
+    @Test
+    public void exampleKeyQueryIntoWindowStore() {
+        final Windowed<Integer> key = new Windowed<>(1, new TimeWindow(0L, 99L));
+
+        final InteractiveQueryRequest<ValueAndTimestamp<Long>> query =
+            inStore(UNCACHED_COUNTS_TABLE).withQuery(KeyQuery.withKey(key));
+
+        final InteractiveQueryResult<ValueAndTimestamp<Long>> result = kafkaStreams.query(query);
+
+        final ValueAndTimestamp<Long> value = result.getOnlyPartitionResult().getResult();
+
+        assertThat(value.value(), is(2L));
+    }
+
+    @Test
+    public void shouldScanUncachedTablePartitions() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+        final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
+            kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan()));
+
+        System.out.println("|||" + scanResult);
+        final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
+            scanResult.getPartitionResults();
+        for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) {
+            try (final KeyValueIterator<Bytes, byte[]> keyValueIterator =
+                entry.getValue().getResult()) {
+                while (keyValueIterator.hasNext()) {
+                    final KeyValue<Bytes, byte[]> next = keyValueIterator.next();
+                    System.out.println(
+                        "|||" + entry.getKey() +
+                            " " + serdes.keyFrom(next.key.get()) +
+                            " " + serdes.valueFrom(next.value)
+                    );
+                }
+            }
+        }
+
+        assertThat(scanResult.getPosition(),
+            is(Position.fromMap(
+                mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+    }
+
+    @Test
+    public void shouldScanUncachedTableCollated() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+        final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
+            kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan()));
+
+        System.out.println("|||" + scanResult);
+        final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults = scanResult.getPartitionResults();
+
+        final List<KeyValueIterator<Bytes, byte[]>> collect =
+            partitionResults
+                .values()
+                .stream()
+                .map(QueryResult::getResult)
+                .collect(Collectors.toList());
+        try (final CloseableIterator<KeyValue<Bytes, byte[]>> collate = Iterators.collate(
+            collect)) {
+            while (collate.hasNext()) {
+                final KeyValue<Bytes, byte[]> next = collate.next();
+                System.out.println(
+                    "|||" +
+                        " " + serdes.keyFrom(next.key.get()) +
+                        " " + serdes.valueFrom(next.value)
+                );
+            }
+        }
+
+        assertThat(scanResult.getPosition(),
+            is(Position.fromMap(
+                mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+    }
+
+    @Test
+    public void shouldQueryWithinBound() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+        final byte[] rawKey = serdes.rawKey(1);
+        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+            inStore(UNCACHED_TABLE)
+                .withQuery(RawKeyQuery.withKey(rawKey))
+                .withPositionBound(
+                    at(
+                        Position
+                            .emptyPosition()
+                            .withComponent(INPUT_TOPIC_NAME, 0, 0L)
+                            .withComponent(INPUT_TOPIC_NAME, 1, 1L)
+                    )
+                )
+        );
+
+        System.out.println("|||" + result);
+        final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0);
+        final ValueAndTimestamp<Integer> value =
+            serdes.valueFrom(rawValueResult.getResult());
+        System.out.println("|||" + value);
+        assertThat(result.getPosition(),
+            is(Position.fromMap(
+                mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+    }
+
+    @Test
+    public void shouldFailQueryOutsideOfBound() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+        final byte[] rawKey = serdes.rawKey(1);
+        // intentionally setting the bound higher than the current position.
+        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+            inStore(UNCACHED_TABLE)
+                .withQuery(RawKeyQuery.withKey(rawKey))
+                .withPositionBound(
+                    at(
+                        Position
+                            .emptyPosition()
+                            .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                            .withComponent(INPUT_TOPIC_NAME, 1, 2L)
+                    )
+                )
+        );
+
+        System.out.println("|||" + result);
+        final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0);
+
+        final RuntimeException runtimeException = assertThrows(
+            RuntimeException.class,
+            rawValueResult::getResult
+        );
+        assertThat(
+            runtimeException.getMessage(),
+            is("NOT_UP_TO_BOUND: For store partition 0, the current position Position{position={input-topic={0=0}}} is not yet up to the bound PositionBound{position=Position{position={input-topic={0=1, 1=2}}}}"));
+
+        assertThat(rawValueResult.isFailure(), is(true));
+        assertThat(rawValueResult.getFailureReason(), is(FailureReason.NOT_UP_TO_BOUND));
+        assertThat(rawValueResult.getFailure(),
+            is("For store partition 0, the current position Position{position={input-topic={0=0}}} is not yet up to the bound PositionBound{position=Position{position={input-topic={0=1, 1=2}}}}"));
+        assertThat(result.getPosition(), is(Position.emptyPosition()));
+    }
+
+
+    @Test
+    public void shouldPartiallySucceedOnPartialBound() {
+
+        final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+            kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+        final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
+            kafkaStreams.query(
+                inStore(UNCACHED_TABLE)
+                    .withQuery(RawScanQuery.scan())
+                    .withPositionBound(
+                        at(
+                            Position
+                                .emptyPosition()
+                                .withComponent(INPUT_TOPIC_NAME, 0, 0L)
+                                .withComponent(INPUT_TOPIC_NAME, 1, 2L)
+                        )
+                    )
+            );
+
+        System.out.println("|||" + scanResult);
+        final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults = scanResult.getPartitionResults();
+        for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) {
+            final QueryResult<KeyValueIterator<Bytes, byte[]>> value = entry.getValue();
+            if (value.isSuccess()) {
+                try (final KeyValueIterator<Bytes, byte[]> keyValueIterator =
+                    value.getResult()) {
+                    while (keyValueIterator.hasNext()) {
+                        final KeyValue<Bytes, byte[]> next = keyValueIterator.next();
+                        System.out.println(
+                            "|||" + entry.getKey() +
+                                " " + serdes.keyFrom(next.key.get()) +
+                                " " + serdes.valueFrom(next.value)
+                        );
+                    }
+                }
+            }
+        }
+
+        assertThat(scanResult.getPartitionResults().get(0).isSuccess(), is(true));
+        assertThat(scanResult.getPartitionResults().get(1).isFailure(), is(true));
+        assertThat(scanResult.getPartitionResults().get(1).getFailureReason(),
+            is(FailureReason.NOT_UP_TO_BOUND));
+        assertThat(scanResult.getPosition(),
+            is(Position.fromMap(mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L)))))));
+    }
+
+    private static Properties streamsConfiguration() {
+        final String safeTestName = IQv2IntegrationTest.class.getName();
+        final Properties config = new Properties();
+        config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+        config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
+        config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
+        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        return config;
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 066f080..3772904 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -71,6 +71,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
@@ -786,6 +787,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
         EasyMock.expect(context.appConfigs())
             .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
         EasyMock.expect(context.stateDir()).andStubReturn(dir);
+        EasyMock.expect(context.recordMetadata()).andReturn(Optional.empty());
         EasyMock.replay(context);
 
         rocksDBStore.init((StateStoreContext) context, rocksDBStore);
@@ -818,6 +820,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
         EasyMock.expect(context.appConfigs())
                 .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
         EasyMock.expect(context.stateDir()).andStubReturn(dir);
+        EasyMock.expect(context.recordMetadata()).andReturn(Optional.empty());
         EasyMock.replay(context);
 
         rocksDBStore.init((StateStoreContext) context, rocksDBStore);

[kafka] 02/03: formalize Position

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-framework
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit da055cc712646d32f02d5bf3ea0305a649cf4151
Author: John Roesler <vv...@apache.org>
AuthorDate: Tue Nov 23 16:49:52 2021 -0600

    formalize Position
---
 .../org/apache/kafka/streams/query/Position.java   | 120 +++---------
 .../apache/kafka/streams/query/PositionTest.java   | 215 +++++++++++++++++++++
 2 files changed, 243 insertions(+), 92 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
index 5b0e981..4c613df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/Position.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.query;
 
 
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -26,38 +28,45 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
+@Evolving
 public class Position {
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position;
 
-    private final Map<String, Map<Integer, Long>> position;
-
-    private Position(final Map<String, Map<Integer, Long>> position) {
+    private Position(final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position) {
         this.position = position;
     }
 
     public static Position emptyPosition() {
-        return new Position(new HashMap<>());
+        return new Position(new ConcurrentHashMap<>());
     }
 
-    public static Position fromMap(final Map<String, Map<Integer, Long>> map) {
+    public static Position fromMap(final Map<String, ? extends Map<Integer, Long>> map) {
         return new Position(deepCopy(map));
     }
 
     public Position withComponent(final String topic, final int partition, final long offset) {
-        final Map<String, Map<Integer, Long>> updated = deepCopy(position);
-        updated.computeIfAbsent(topic, k -> new HashMap<>()).put(partition, offset);
-        return new Position(updated);
+        position
+            .computeIfAbsent(topic, k -> new ConcurrentHashMap<>())
+            .put(partition, offset);
+        return this;
+    }
+
+    public Position copy() {
+        return new Position(deepCopy(position));
     }
 
     public Position merge(final Position other) {
         if (other == null) {
             return this;
         } else {
-            final Map<String, Map<Integer, Long>> copy = deepCopy(position);
-            for (final Entry<String, Map<Integer, Long>> entry : other.position.entrySet()) {
+            final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> copy =
+                deepCopy(position);
+            for (final Entry<String, ConcurrentHashMap<Integer, Long>> entry : other.position.entrySet()) {
                 final String topic = entry.getKey();
                 final Map<Integer, Long> partitionMap =
-                    copy.computeIfAbsent(topic, k -> new HashMap<>());
+                    copy.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
                 for (final Entry<Integer, Long> partitionOffset : entry.getValue().entrySet()) {
                     final Integer partition = partitionOffset.getKey();
                     final Long offset = partitionOffset.getValue();
@@ -79,88 +88,15 @@ public class Position {
         return Collections.unmodifiableMap(position.get(topic));
     }
 
-    public ByteBuffer serialize() {
-        final byte version = (byte) 0;
-
-        int arraySize = Byte.SIZE; // version
-
-        final int nTopics = position.size();
-        arraySize += Integer.SIZE;
-
-        final ArrayList<Entry<String, Map<Integer, Long>>> entries =
-            new ArrayList<>(position.entrySet());
-        final byte[][] topics = new byte[entries.size()][];
-
-        for (int i = 0; i < nTopics; i++) {
-            final Entry<String, Map<Integer, Long>> entry = entries.get(i);
-            final byte[] topicBytes = entry.getKey().getBytes(StandardCharsets.UTF_8);
-            topics[i] = topicBytes;
-            arraySize += Integer.SIZE; // topic name length
-            arraySize += topicBytes.length; // topic name itself
-
-            final Map<Integer, Long> partitionOffsets = entry.getValue();
-            arraySize += Integer.SIZE; // Number of PartitionOffset pairs
-            arraySize += (Integer.SIZE + Long.SIZE)
-                * partitionOffsets.size(); // partitionOffsets themselves
-        }
-
-        final ByteBuffer buffer = ByteBuffer.allocate(arraySize);
-        buffer.put(version);
-
-        buffer.putInt(nTopics);
-        for (int i = 0; i < nTopics; i++) {
-            buffer.putInt(topics[i].length);
-            buffer.put(topics[i]);
-
-            final Entry<String, Map<Integer, Long>> entry = entries.get(i);
-            final Map<Integer, Long> partitionOffsets = entry.getValue();
-            buffer.putInt(partitionOffsets.size());
-            for (final Entry<Integer, Long> partitionOffset : partitionOffsets.entrySet()) {
-                buffer.putInt(partitionOffset.getKey());
-                buffer.putLong(partitionOffset.getValue());
-            }
-        }
-
-        buffer.flip();
-        return buffer;
-    }
-
-    public static Position deserialize(final ByteBuffer buffer) {
-        final byte version = buffer.get();
-
-        switch (version) {
-            case (byte) 0:
-                final int nTopics = buffer.getInt();
-                final Map<String, Map<Integer, Long>> position = new HashMap<>(nTopics);
-                for (int i = 0; i < nTopics; i++) {
-                    final int topicNameLength = buffer.getInt();
-                    final byte[] topicNameBytes = new byte[topicNameLength];
-                    buffer.get(topicNameBytes);
-                    final String topic = new String(topicNameBytes, StandardCharsets.UTF_8);
-
-                    final int numPairs = buffer.getInt();
-                    final Map<Integer, Long> partitionOffsets = new HashMap<>(numPairs);
-                    for (int j = 0; j < numPairs; j++) {
-                        partitionOffsets.put(buffer.getInt(), buffer.getLong());
-                    }
-                    position.put(topic, partitionOffsets);
-                }
-                return Position.fromMap(position);
-            default:
-                throw new IllegalArgumentException(
-                    "Unknown version " + version + " when deserializing Position"
-                );
-        }
-    }
-
-    private static Map<String, Map<Integer, Long>> deepCopy(
-        final Map<String, Map<Integer, Long>> map) {
+    private static ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> deepCopy(
+        final Map<String, ? extends Map<Integer, Long>> map) {
         if (map == null) {
-            return new HashMap<>();
+            return new ConcurrentHashMap<>();
         } else {
-            final Map<String, Map<Integer, Long>> copy = new HashMap<>(map.size());
-            for (final Entry<String, Map<Integer, Long>> entry : map.entrySet()) {
-                copy.put(entry.getKey(), new HashMap<>(entry.getValue()));
+            final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> copy =
+                new ConcurrentHashMap<>(map.size());
+            for (final Entry<String, ? extends Map<Integer, Long>> entry : map.entrySet()) {
+                copy.put(entry.getKey(), new ConcurrentHashMap<>(entry.getValue()));
             }
             return copy;
         }
@@ -187,6 +123,6 @@ public class Position {
 
     @Override
     public int hashCode() {
-        return Objects.hash(position);
+        throw new UnsupportedOperationException("This mutable object is not suitable as a hash key");
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
new file mode 100644
index 0000000..0a0cf5c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+
+public class PositionTest {
+
+    @Test
+    public void shouldCreateFromMap() {
+        final Map<String, Map<Integer, Long>> map = mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        );
+
+        final Position position = Position.fromMap(map);
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+
+        // Should be a copy of the constructor map
+
+        map.get("topic1").put(99, 99L);
+
+        // so the position is still the original one
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+    }
+
+    @Test
+    public void shouldCreateFromNullMap() {
+        final Position position = Position.fromMap(null);
+        assertThat(position.getTopics(), equalTo(Collections.emptySet()));
+    }
+
+    @Test
+    public void shouldMerge() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position position1 = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 7L))), // update offset
+            mkEntry("topic1", mkMap(mkEntry(8, 1L))), // add partition
+            mkEntry("topic2", mkMap(mkEntry(9, 5L))) // add topic
+        ));
+
+        final Position merged = position.merge(position1);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 7L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(merged.getBound("topic2"), equalTo(mkMap(mkEntry(9, 5L))));
+    }
+
+    @Test
+    public void shouldCopy() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position copy = position.copy();
+
+        // mutate original
+        position.withComponent("topic", 0, 6L);
+        position.withComponent("topic1", 8, 1L);
+        position.withComponent("topic2", 2, 4L);
+
+        // copy has not changed
+        assertThat(copy.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(copy.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(copy.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+
+        // original has changed
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 6L))));
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(position.getBound("topic2"), equalTo(mkMap(mkEntry(2, 4L))));
+    }
+
+    @Test
+    public void shouldMergeNull() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position merged = position.merge(null);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+    }
+
+    @Test
+    public void shouldMatchOnEqual() {
+        final Position position1 = Position.emptyPosition();
+        final Position position2 = Position.emptyPosition();
+        position1.withComponent("topic1", 0, 1);
+        position2.withComponent("topic1", 0, 1);
+
+        position1.withComponent("topic1", 1, 2);
+        position2.withComponent("topic1", 1, 2);
+
+        position1.withComponent("topic1", 2, 1);
+        position2.withComponent("topic1", 2, 1);
+
+        position1.withComponent("topic2", 0, 0);
+        position2.withComponent("topic2", 0, 0);
+
+        assertEquals(position1, position2);
+    }
+
+    @Test
+    public void shouldNotMatchOnUnEqual() {
+        final Position position1 = Position.emptyPosition();
+        final Position position2 = Position.emptyPosition();
+        position1.withComponent("topic1", 0, 1);
+        position2.withComponent("topic1", 0, 1);
+
+        position1.withComponent("topic1", 1, 2);
+
+        position1.withComponent("topic1", 2, 1);
+        position2.withComponent("topic1", 2, 1);
+
+        position1.withComponent("topic2", 0, 0);
+        position2.withComponent("topic2", 0, 0);
+
+        assertNotEquals(position1, position2);
+    }
+
+    @Test
+    public void shouldNotMatchNull() {
+        final Position position = Position.emptyPosition();
+        assertNotEquals(position, null);
+    }
+
+    @Test
+    public void shouldMatchSelf() {
+        final Position position = Position.emptyPosition();
+        assertEquals(position, position);
+    }
+
+    @Test
+    public void shouldNotHash() {
+        final Position position = Position.emptyPosition();
+        assertThrows(UnsupportedOperationException.class, position::hashCode);
+
+        // going overboard...
+        final HashSet<Position> set = new HashSet<>();
+        assertThrows(UnsupportedOperationException.class, () -> set.add(position));
+
+        final HashMap<Position, Integer> map = new HashMap<>();
+        assertThrows(UnsupportedOperationException.class, () -> map.put(position, 5));
+    }
+}