You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/01/03 04:25:25 UTC

[kafka] branch trunk updated: KAFKA-13494: WindowKeyQuery and WindowRangeQuery (#11567)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b8f1cf1  KAFKA-13494: WindowKeyQuery and WindowRangeQuery (#11567)
b8f1cf1 is described below

commit b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c
Author: Patrick Stuedi <ps...@confluent.io>
AuthorDate: Mon Jan 3 05:17:38 2022 +0100

    KAFKA-13494: WindowKeyQuery and WindowRangeQuery (#11567)
    
    Implement WindowKeyQuery and WindowRangeQuery as
    proposed in KIP-806
    
    Reviewer: John Roesler <vv...@apache.org>
---
 .../apache/kafka/streams/query/QueryResult.java    |   2 +-
 .../apache/kafka/streams/query/WindowKeyQuery.java |  67 ++++
 .../kafka/streams/query/WindowRangeQuery.java      |  69 ++++
 .../streams/query/internals/FailedQueryResult.java |  10 +
 .../query/internals/SucceededQueryResult.java      |   9 +
 .../state/internals/InMemoryKeyValueStore.java     |  19 +-
 .../state/internals/InMemorySessionStore.java      |   6 +-
 .../state/internals/InMemoryWindowStore.java       |   6 +-
 .../state/internals/MeteredKeyValueStore.java      |  32 +-
 .../state/internals/MeteredSessionStore.java       | 117 +++++-
 .../state/internals/MeteredWindowStore.java        | 203 +++++++++-
 .../internals/MeteredWindowStoreIterator.java      |  11 +-
 .../internals/MeteredWindowedKeyValueIterator.java |  16 +-
 .../state/internals/RocksDBSessionStore.java       |   6 +-
 .../state/internals/RocksDBWindowStore.java        |   6 +-
 .../streams/state/internals/StoreQueryUtils.java   | 136 +++++++
 .../integration/IQv2StoreIntegrationTest.java      | 418 +++++++++++++++++++--
 17 files changed, 1031 insertions(+), 102 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
index 5a5987f..d8d535d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
@@ -58,7 +58,7 @@ public interface QueryResult<R> {
         final Query<R> query,
         final StateStore store) {
 
-        return new FailedQueryResult<>(
+        return forFailure(
             FailureReason.UNKNOWN_QUERY_TYPE,
             "This store (" + store.getClass() + ") doesn't know how to execute "
                 + "the given query (" + query + ")." +
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/WindowKeyQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/WindowKeyQuery.java
new file mode 100644
index 0000000..79e9dc8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/WindowKeyQuery.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+import java.util.Optional;
+
+@Evolving
+public class WindowKeyQuery<K, V> implements Query<WindowStoreIterator<V>> {
+
+    private final K key;
+    private final Optional<Instant> timeFrom;
+    private final Optional<Instant> timeTo;
+
+    private WindowKeyQuery(final K key,
+                           final Optional<Instant> timeTo,
+                           final Optional<Instant> timeFrom) {
+        this.key = key;
+        this.timeFrom = timeFrom;
+        this.timeTo = timeTo;
+    }
+
+    public static <K, V> WindowKeyQuery<K, V> withKeyAndWindowStartRange(final K key,
+                                                                         final Instant timeFrom,
+                                                                         final Instant timeTo) {
+        return new WindowKeyQuery<>(key, Optional.of(timeFrom), Optional.of(timeTo));
+    }
+
+    public K getKey() {
+        return key;
+    }
+
+    public Optional<Instant> getTimeFrom() {
+        return timeFrom;
+    }
+
+    public Optional<Instant> getTimeTo() {
+        return timeTo;
+    }
+
+    @Override
+    public String toString() {
+        return "WindowKeyQuery{" +
+            "key=" + key +
+            ", timeFrom=" + timeFrom +
+            ", timeTo=" + timeTo +
+            '}';
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/WindowRangeQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/WindowRangeQuery.java
new file mode 100644
index 0000000..fb658d7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/WindowRangeQuery.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.time.Instant;
+import java.util.Optional;
+
+public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> {
+
+    private final Optional<K> key;
+    private final Optional<Instant> timeFrom;
+    private final Optional<Instant> timeTo;
+
+    private WindowRangeQuery(final Optional<K> key,
+                             final Optional<Instant> timeFrom,
+                             final Optional<Instant> timeTo) {
+        this.key = key;
+        this.timeFrom = timeFrom;
+        this.timeTo = timeTo;
+    }
+
+    public static <K, V> WindowRangeQuery<K, V> withKey(final K key) {
+        return new WindowRangeQuery<>(Optional.of(key), Optional.empty(), Optional.empty());
+    }
+
+    public static <K, V> WindowRangeQuery<K, V> withWindowStartRange(final Instant timeFrom,
+                                                                     final Instant timeTo) {
+        return new WindowRangeQuery<>(Optional.empty(), Optional.of(timeFrom), Optional.of(timeTo));
+    }
+
+    public Optional<K> getKey() {
+        return key;
+    }
+
+    public Optional<Instant> getTimeFrom() {
+        return timeFrom;
+    }
+
+    public Optional<Instant> getTimeTo() {
+        return timeTo;
+    }
+
+    @Override
+    public String toString() {
+        return "WindowRangeQuery{" +
+            "key=" + key +
+            ", timeFrom=" + timeFrom +
+            ", timeTo=" + timeTo +
+            '}';
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/internals/FailedQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/internals/FailedQueryResult.java
index f49cb6b..97860de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/internals/FailedQueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/internals/FailedQueryResult.java
@@ -86,4 +86,14 @@ public final class FailedQueryResult<R>
             "Cannot get result for failed query. Failure is " + failureReason.name() + ": "
                 + failure);
     }
+
+    @Override
+    public String toString() {
+        return "FailedQueryResult{" +
+            "failureReason=" + failureReason +
+            ", failure='" + failure + '\'' +
+            ", executionInfo=" + getExecutionInfo() +
+            ", position=" + getPosition() +
+            '}';
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/internals/SucceededQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/internals/SucceededQueryResult.java
index 53050a5..8787ff8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/internals/SucceededQueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/internals/SucceededQueryResult.java
@@ -99,4 +99,13 @@ public final class SucceededQueryResult<R>
     public R getResult() {
         return result;
     }
+
+    @Override
+    public String toString() {
+        return "SucceededQueryResult{" +
+            "result=" + result +
+            ", executionInfo=" + getExecutionInfo() +
+            ", position=" + getPosition() +
+            '}';
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index f5f75ab..061cd85 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
-import org.apache.kafka.streams.query.RangeQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.slf4j.Logger;
@@ -93,20 +92,10 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public <R> QueryResult<R> query(
-        final Query<R> query,
-        final PositionBound positionBound,
-        final boolean collectExecutionInfo) {
-
-        if (query instanceof RangeQuery) {
-            final RangeQuery<Bytes, byte[]> typedQuery = (RangeQuery<Bytes, byte[]>) query;
-            final KeyValueIterator<Bytes, byte[]> keyValueIterator =  this.range(
-                    typedQuery.getLowerBound().orElse(null), typedQuery.getUpperBound().orElse(null));
-            final R result = (R) keyValueIterator;
-            final QueryResult<R> queryResult = QueryResult.forResult(result);
-            return queryResult;
-        }
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final boolean collectExecutionInfo) {
+
         return StoreQueryUtils.handleBasicQueries(
             query,
             positionBound,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index cf351e3..03761cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -315,8 +315,10 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
     }
 
     @Override
-    public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
-        final boolean collectExecutionInfo) {
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final boolean collectExecutionInfo) {
+
         return StoreQueryUtils.handleBasicQueries(
             query,
             positionBound,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 1b49e9e..ce14dc1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -351,8 +351,10 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
     }
 
     @Override
-    public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
-        final boolean collectExecutionInfo) {
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final boolean collectExecutionInfo) {
+
         return StoreQueryUtils.handleBasicQueries(
             query,
             positionBound,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 76679af..cd5bd5a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -35,12 +34,12 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
 import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -51,11 +50,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Function;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
 
 /**
  * A Metered {@link KeyValueStore} wrapper that is used for recording operation metrics, and hence its
@@ -269,7 +270,7 @@ public class MeteredKeyValueStore<K, V>
             final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator(
                 iterator,
                 getSensor,
-                getValueDeserializer()
+                getDeserializeValue(serdes, wrapped())
             );
             final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
                 InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
@@ -296,8 +297,8 @@ public class MeteredKeyValueStore<K, V>
         final QueryResult<byte[]> rawResult =
             wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
         if (rawResult.isSuccess()) {
-            final Deserializer<V> deserializer = getValueDeserializer();
-            final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult());
+            final Function<byte[], V> deserializer = getDeserializeValue(serdes, wrapped());
+            final V value = deserializer.apply(rawResult.getResult());
             final QueryResult<V> typedQueryResult =
                 InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
             result = (QueryResult<R>) typedQueryResult;
@@ -308,21 +309,6 @@ public class MeteredKeyValueStore<K, V>
         return result;
     }
 
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    private Deserializer<V> getValueDeserializer() {
-        final Serde<V> valueSerde = serdes.valueSerde();
-        final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
-        final Deserializer<V> deserializer;
-        if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {
-            final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
-                (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) valueSerde).deserializer();
-            deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
-        } else {
-            deserializer = valueSerde.deserializer();
-        }
-        return deserializer;
-    }
-
     @Override
     public V get(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
@@ -507,11 +493,11 @@ public class MeteredKeyValueStore<K, V>
         private final KeyValueIterator<Bytes, byte[]> iter;
         private final Sensor sensor;
         private final long startNs;
-        private final Deserializer<V> valueDeserializer;
+        private final Function<byte[], V> valueDeserializer;
 
         private MeteredKeyValueTimestampedIterator(final KeyValueIterator<Bytes, byte[]> iter,
                                         final Sensor sensor,
-                                        final Deserializer<V> valueDeserializer) {
+                                        final Function<byte[], V> valueDeserializer) {
             this.iter = iter;
             this.sensor = sensor;
             this.valueDeserializer = valueDeserializer;
@@ -528,7 +514,7 @@ public class MeteredKeyValueStore<K, V>
             final KeyValue<Bytes, byte[]> keyValue = iter.next();
             return KeyValue.pair(
                     serdes.keyFrom(keyValue.key.get()),
-                    valueDeserializer.deserialize(serdes.topic(), keyValue.value));
+                    valueDeserializer.apply(keyValue.value));
         }
 
         @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index b1eb948..1425b05 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -33,13 +33,22 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowRangeQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
 import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 
+import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
 public class MeteredSessionStore<K, V>
@@ -60,6 +69,15 @@ public class MeteredSessionStore<K, V>
     private InternalProcessorContext<?, ?> context;
     private TaskId taskId;
 
+    @SuppressWarnings("rawtypes")
+    private final Map<Class, QueryHandler> queryHandlers =
+            mkMap(
+                    mkEntry(
+                            WindowRangeQuery.class,
+                            (query, positionBound, collectExecutionInfo, store) -> runRangeQuery(query, positionBound, collectExecutionInfo)
+                    )
+            );
+
 
     MeteredSessionStore(final SessionStore<Bytes, byte[]> inner,
                         final String metricsScope,
@@ -230,7 +248,8 @@ public class MeteredSessionStore<K, V>
             wrapped().fetch(keyBytes(key)),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,
             time);
     }
 
@@ -241,7 +260,8 @@ public class MeteredSessionStore<K, V>
             wrapped().backwardFetch(keyBytes(key)),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,
             time
         );
     }
@@ -253,7 +273,8 @@ public class MeteredSessionStore<K, V>
             wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo)),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,
             time);
     }
 
@@ -264,7 +285,8 @@ public class MeteredSessionStore<K, V>
             wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo)),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,
             time
         );
     }
@@ -282,7 +304,8 @@ public class MeteredSessionStore<K, V>
                 latestSessionStartTime),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,
             time);
     }
 
@@ -300,7 +323,8 @@ public class MeteredSessionStore<K, V>
             ),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,
             time
         );
     }
@@ -320,7 +344,8 @@ public class MeteredSessionStore<K, V>
                 latestSessionStartTime),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,
             time);
     }
 
@@ -340,7 +365,8 @@ public class MeteredSessionStore<K, V>
             ),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,
             time
         );
     }
@@ -359,6 +385,81 @@ public class MeteredSessionStore<K, V>
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final boolean collectExecutionInfo) {
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (time.nanoseconds() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runRangeQuery(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) query;
+        if (typedQuery.getKey().isPresent()) {
+            final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
+                WindowRangeQuery.withKey(
+                    Bytes.wrap(serdes.rawKey(typedQuery.getKey().get()))
+                );
+            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+            if (rawResult.isSuccess()) {
+                final MeteredWindowedKeyValueIterator<K, V> typedResult =
+                    new MeteredWindowedKeyValueIterator<>(
+                        rawResult.getResult(),
+                        fetchSensor,
+                        streamsMetrics,
+                        serdes::keyFrom,
+                        StoreQueryUtils.getDeserializeValue(serdes, wrapped()),
+                        time
+                    );
+                final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult =
+                    QueryResult.forResult(typedResult);
+                result = (QueryResult<R>) typedQueryResult;
+            } else {
+                // the generic type doesn't matter, since failed queries have no result set.
+                result = (QueryResult<R>) rawResult;
+            }
+        } else {
+
+            result = QueryResult.forFailure(
+                FailureReason.UNKNOWN_QUERY_TYPE,
+                "This store (" + getClass() + ") doesn't know how to"
+                    + " execute the given query (" + query + ") because"
+                    + " SessionStores only support WindowRangeQuery.withKey."
+                    + " Contact the store maintainer if you need support"
+                    + " for a new query type."
+            );
+        }
+        return result;
+    }
+
     private Bytes keyBytes(final K key) {
         return key == null ? null : Bytes.wrap(serdes.rawKey(key));
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 0970703..605e821 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -33,16 +33,27 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
 import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 
+import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
 
 public class MeteredWindowStore<K, V>
     extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V>
@@ -62,6 +73,27 @@ public class MeteredWindowStore<K, V>
     private InternalProcessorContext<?, ?> context;
     private TaskId taskId;
 
+    @SuppressWarnings("rawtypes")
+    private final Map<Class, QueryHandler> queryHandlers =
+        mkMap(
+            mkEntry(
+                WindowRangeQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> runRangeQuery(
+                    query,
+                    positionBound,
+                    collectExecutionInfo
+                )
+            ),
+            mkEntry(
+                WindowKeyQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> runKeyQuery(
+                    query,
+                    positionBound,
+                    collectExecutionInfo
+                )
+            )
+        );
+
     MeteredWindowStore(final WindowStore<Bytes, byte[]> inner,
                        final long windowSizeMs,
                        final String metricsScope,
@@ -205,7 +237,7 @@ public class MeteredWindowStore<K, V>
             wrapped().fetch(keyBytes(key), timeFrom, timeTo),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::valueFrom,
             time
         );
     }
@@ -219,7 +251,7 @@ public class MeteredWindowStore<K, V>
             wrapped().backwardFetch(keyBytes(key), timeFrom, timeTo),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::valueFrom,
             time
         );
     }
@@ -237,7 +269,8 @@ public class MeteredWindowStore<K, V>
                 timeTo),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,
             time);
     }
 
@@ -254,7 +287,8 @@ public class MeteredWindowStore<K, V>
                 timeTo),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,
             time);
     }
 
@@ -265,7 +299,8 @@ public class MeteredWindowStore<K, V>
             wrapped().fetchAll(timeFrom, timeTo),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,
             time);
     }
 
@@ -276,18 +311,33 @@ public class MeteredWindowStore<K, V>
             wrapped().backwardFetchAll(timeFrom, timeTo),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,
             time);
     }
 
     @Override
     public KeyValueIterator<Windowed<K>, V> all() {
-        return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchSensor, streamsMetrics, serdes, time);
+        return new MeteredWindowedKeyValueIterator<>(
+            wrapped().all(),
+            fetchSensor,
+            streamsMetrics,
+            serdes::keyFrom,
+            serdes::valueFrom,
+            time
+        );
     }
 
     @Override
     public KeyValueIterator<Windowed<K>, V> backwardAll() {
-        return new MeteredWindowedKeyValueIterator<>(wrapped().backwardAll(), fetchSensor, streamsMetrics, serdes, time);
+        return new MeteredWindowedKeyValueIterator<>(
+            wrapped().backwardAll(),
+            fetchSensor,
+            streamsMetrics,
+            serdes::keyFrom,
+            serdes::valueFrom,
+            time
+        );
     }
 
     @Override
@@ -304,10 +354,147 @@ public class MeteredWindowStore<K, V>
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final boolean collectExecutionInfo) {
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (time.nanoseconds() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runRangeQuery(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) query;
+        // There's no store API for open time ranges
+        if (typedQuery.getTimeFrom().isPresent() && typedQuery.getTimeTo().isPresent()) {
+            final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
+                WindowRangeQuery.withWindowStartRange(
+                    typedQuery.getTimeFrom().get(),
+                    typedQuery.getTimeTo().get()
+                );
+            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult =
+                wrapped().query(
+                    rawKeyQuery,
+                    positionBound,
+                    collectExecutionInfo
+                );
+            if (rawResult.isSuccess()) {
+                final MeteredWindowedKeyValueIterator<K, V> typedResult =
+                    new MeteredWindowedKeyValueIterator<>(
+                        rawResult.getResult(),
+                        fetchSensor,
+                        streamsMetrics,
+                        serdes::keyFrom,
+                        getDeserializeValue(serdes, wrapped()),
+                        time
+                    );
+                final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult =
+                    QueryResult.forResult(
+                        typedResult
+                    );
+                result = (QueryResult<R>) typedQueryResult;
+            } else {
+                // the generic type doesn't matter, since failed queries have no result set.
+                result = (QueryResult<R>) rawResult;
+            }
+        } else {
+
+            result = QueryResult.forFailure(
+                FailureReason.UNKNOWN_QUERY_TYPE,
+                "This store (" + getClass() + ") doesn't know how to"
+                    + " execute the given query (" + query + ") because"
+                    + " WindowStores only supports WindowRangeQuery.withWindowStartRange."
+                    + " Contact the store maintainer if you need support"
+                    + " for a new query type."
+            );
+        }
+        return result;
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                           final PositionBound positionBound,
+                                           final boolean collectExecutionInfo) {
+        final QueryResult<R> queryResult;
+        final WindowKeyQuery<K, V> typedQuery = (WindowKeyQuery<K, V>) query;
+        // There's no store API for open time ranges
+        if (typedQuery.getTimeFrom().isPresent() && typedQuery.getTimeTo().isPresent()) {
+            final WindowKeyQuery<Bytes, byte[]> rawKeyQuery =
+                WindowKeyQuery.withKeyAndWindowStartRange(
+                    keyBytes(typedQuery.getKey()),
+                    typedQuery.getTimeFrom().get(),
+                    typedQuery.getTimeTo().get()
+                );
+            final QueryResult<WindowStoreIterator<byte[]>> rawResult = wrapped().query(
+                rawKeyQuery,
+                positionBound,
+                collectExecutionInfo
+            );
+            if (rawResult.isSuccess()) {
+                final MeteredWindowStoreIterator<V> typedResult = new MeteredWindowStoreIterator<>(
+                    rawResult.getResult(),
+                    fetchSensor,
+                    streamsMetrics,
+                    getDeserializeValue(serdes, wrapped()),
+                    time
+                );
+                final QueryResult<MeteredWindowStoreIterator<V>> typedQueryResult =
+                    QueryResult.forResult(
+                        typedResult
+                    );
+                queryResult = (QueryResult<R>) typedQueryResult;
+            } else {
+                // the generic type doesn't matter, since failed queries have no result set.
+                queryResult = (QueryResult<R>) rawResult;
+            }
+        } else {
+
+            queryResult = QueryResult.forFailure(
+                FailureReason.UNKNOWN_QUERY_TYPE,
+                "This store (" + getClass() + ") doesn't know how to execute"
+                    + " the given query (" + query + ") because it only supports closed-range"
+                    + " queries."
+                    + " Contact the store maintainer if you need support for a new query type."
+            );
+        }
+        return queryResult;
+    }
+
     private Bytes keyBytes(final K key) {
         return Bytes.wrap(serdes.rawKey(key));
     }
 
+    protected V outerValue(final byte[] value) {
+        return value != null ? serdes.valueFrom(value) : null;
+    }
+
     private void maybeRecordE2ELatency() {
         // Context is null if the provided context isn't an implementation of InternalProcessorContext.
         // In that case, we _can't_ get the current timestamp, so we don't record anything.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
index 98bc655..b368b7b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
@@ -20,27 +20,28 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
+import java.util.function.Function;
+
 class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
 
     private final WindowStoreIterator<byte[]> iter;
     private final Sensor sensor;
     private final StreamsMetrics metrics;
-    private final StateSerdes<?, V> serdes;
+    private final Function<byte[], V> valueFrom;
     private final long startNs;
     private final Time time;
 
     MeteredWindowStoreIterator(final WindowStoreIterator<byte[]> iter,
                                final Sensor sensor,
                                final StreamsMetrics metrics,
-                               final StateSerdes<?, V> serdes,
+                               final Function<byte[], V> valueFrom,
                                final Time time) {
         this.iter = iter;
         this.sensor = sensor;
         this.metrics = metrics;
-        this.serdes = serdes;
+        this.valueFrom = valueFrom;
         this.startNs = time.nanoseconds();
         this.time = time;
     }
@@ -53,7 +54,7 @@ class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
     @Override
     public KeyValue<Long, V> next() {
         final KeyValue<Long, byte[]> next = iter.next();
-        return KeyValue.pair(next.key, serdes.valueFrom(next.value));
+        return KeyValue.pair(next.key, valueFrom.apply(next.value));
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
index 411871d..6809380 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
@@ -23,26 +23,30 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.function.Function;
 
 class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
 
     private final KeyValueIterator<Windowed<Bytes>, byte[]> iter;
     private final Sensor sensor;
     private final StreamsMetrics metrics;
-    private final StateSerdes<K, V> serdes;
+    private final Function<byte[], K> deserializeKey;
+    private final Function<byte[], V> deserializeValue;
     private final long startNs;
     private final Time time;
 
     MeteredWindowedKeyValueIterator(final KeyValueIterator<Windowed<Bytes>, byte[]> iter,
                                     final Sensor sensor,
                                     final StreamsMetrics metrics,
-                                    final StateSerdes<K, V> serdes,
+                                    final Function<byte[], K> deserializeKey,
+                                    final Function<byte[], V> deserializeValue,
                                     final Time time) {
         this.iter = iter;
         this.sensor = sensor;
         this.metrics = metrics;
-        this.serdes = serdes;
+        this.deserializeKey = deserializeKey;
+        this.deserializeValue = deserializeValue;
         this.startNs = time.nanoseconds();
         this.time = time;
     }
@@ -55,11 +59,11 @@ class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed
     @Override
     public KeyValue<Windowed<K>, V> next() {
         final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
-        return KeyValue.pair(windowedKey(next.key), serdes.valueFrom(next.value));
+        return KeyValue.pair(windowedKey(next.key), deserializeValue.apply(next.value));
     }
 
     private Windowed<K> windowedKey(final Windowed<Bytes> bytesKey) {
-        final K key = serdes.keyFrom(bytesKey.key().get());
+        final K key = deserializeKey.apply(bytesKey.key().get());
         return new Windowed<>(key, bytesKey.window());
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index e6e2740..d907183 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -51,8 +51,10 @@ public class RocksDBSessionStore
     }
 
     @Override
-    public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
-        final boolean collectExecutionInfo) {
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final boolean collectExecutionInfo) {
+
         return StoreQueryUtils.handleBasicQueries(
             query,
             positionBound,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 40415c2..c7d941b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -129,8 +129,10 @@ public class RocksDBWindowStore
     }
 
     @Override
-    public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
-        final boolean collectExecutionInfo) {
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final boolean collectExecutionInfo) {
+
         return StoreQueryUtils.handleBasicQueries(
             query,
             positionBound,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index e1d20c9..f0cbdc7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -16,7 +16,10 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
@@ -27,13 +30,20 @@ import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -65,6 +75,14 @@ public final class StoreQueryUtils {
             mkEntry(
                 KeyQuery.class,
                 StoreQueryUtils::runKeyQuery
+            ),
+            mkEntry(
+                WindowKeyQuery.class,
+                StoreQueryUtils::runWindowKeyQuery
+            ),
+            mkEntry(
+                WindowRangeQuery.class,
+                StoreQueryUtils::runWindowRangeQuery
             )
         );
 
@@ -197,6 +215,124 @@ public final class StoreQueryUtils {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runWindowKeyQuery(final Query<R> query,
+                                                        final PositionBound positionBound,
+                                                        final boolean collectExecutionInfo,
+                                                        final StateStore store) {
+        if (store instanceof WindowStore) {
+            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery =
+                (WindowKeyQuery<Bytes, byte[]>) query;
+            final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, byte[]>) store;
+            try {
+                if (windowKeyQuery.getTimeFrom().isPresent() && windowKeyQuery.getTimeTo().isPresent()) {
+                    final WindowStoreIterator<byte[]> iterator = windowStore.fetch(
+                        windowKeyQuery.getKey(),
+                        windowKeyQuery.getTimeFrom().get(),
+                        windowKeyQuery.getTimeTo().get()
+                    );
+                    return (QueryResult<R>) QueryResult.forResult(iterator);
+                } else {
+                    return QueryResult.forFailure(
+                        FailureReason.UNKNOWN_QUERY_TYPE,
+                        "This store (" + store.getClass() + ") doesn't know how to"
+                            + " execute the given query (" + query + ") because it only supports"
+                            + " closed-range queries."
+                            + " Contact the store maintainer if you need support"
+                            + " for a new query type."
+                    );
+                }
+            } catch (final Exception e) {
+                final String message = parseStoreException(e, store, query);
+                return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, message);
+            }
+        } else {
+            return QueryResult.forUnknownQueryType(query, store);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runWindowRangeQuery(final Query<R> query,
+                                                          final PositionBound positionBound,
+                                                          final boolean collectExecutionInfo,
+                                                          final StateStore store) {
+        if (store instanceof WindowStore) {
+            final WindowRangeQuery<Bytes, byte[]> windowRangeQuery =
+                (WindowRangeQuery<Bytes, byte[]>) query;
+            final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, byte[]>) store;
+            try {
+                // There's no store API for open time ranges
+                if (windowRangeQuery.getTimeFrom().isPresent() && windowRangeQuery.getTimeTo().isPresent()) {
+                    final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                        windowStore.fetchAll(
+                            windowRangeQuery.getTimeFrom().get(),
+                            windowRangeQuery.getTimeTo().get()
+                        );
+                    return (QueryResult<R>) QueryResult.forResult(iterator);
+                } else {
+                    return QueryResult.forFailure(
+                        FailureReason.UNKNOWN_QUERY_TYPE,
+                        "This store (" + store.getClass() + ") doesn't know how to"
+                            + " execute the given query (" + query + ") because"
+                            + " WindowStores only supports WindowRangeQuery.withWindowStartRange."
+                            + " Contact the store maintainer if you need support"
+                            + " for a new query type."
+                    );
+                }
+            } catch (final Exception e) {
+                final String message = parseStoreException(e, store, query);
+                return QueryResult.forFailure(
+                    FailureReason.STORE_EXCEPTION,
+                    message
+                );
+            }
+        } else if (store instanceof SessionStore) {
+            final WindowRangeQuery<Bytes, byte[]> windowRangeQuery =
+                (WindowRangeQuery<Bytes, byte[]>) query;
+            final SessionStore<Bytes, byte[]> sessionStore = (SessionStore<Bytes, byte[]>) store;
+            try {
+                if (windowRangeQuery.getKey().isPresent()) {
+                    final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = sessionStore.fetch(
+                        windowRangeQuery.getKey().get());
+                    return (QueryResult<R>) QueryResult.forResult(iterator);
+                } else {
+                    return QueryResult.forFailure(
+                        FailureReason.UNKNOWN_QUERY_TYPE,
+                        "This store (" + store.getClass() + ") doesn't know how to"
+                            + " execute the given query (" + query + ") because"
+                            + " SessionStores only support WindowRangeQuery.withKey."
+                            + " Contact the store maintainer if you need support"
+                            + " for a new query type."
+                    );
+                }
+            } catch (final Exception e) {
+                final String message = parseStoreException(e, store, query);
+                return QueryResult.forFailure(
+                    FailureReason.STORE_EXCEPTION,
+                    message
+                );
+            }
+        } else {
+            return QueryResult.forUnknownQueryType(query, store);
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public static <V> Function<byte[], V> getDeserializeValue(final StateSerdes<?, V> serdes,
+                                                              final StateStore wrapped) {
+        final Serde<V> valueSerde = serdes.valueSerde();
+        final boolean timestamped = WrappedStateStore.isTimestamped(wrapped);
+        final Deserializer<V> deserializer;
+        if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {
+            final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
+                (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) valueSerde).deserializer();
+            deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
+        } else {
+            deserializer = valueSerde.deserializer();
+        }
+        return byteArray -> deserializer.deserialize(serdes.topic(), byteArray);
+    }
+
     private static <R> String parseStoreException(final Exception e, final StateStore store, final Query<R> query) {
         final StringWriter stringWriter = new StringWriter();
         final PrintWriter printWriter = new PrintWriter(stringWriter);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index cfb3860..a08a04e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
@@ -44,6 +45,8 @@ import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.query.RangeQuery;
 import org.apache.kafka.streams.query.StateQueryRequest;
 import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -54,6 +57,7 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -67,6 +71,7 @@ import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -107,13 +112,13 @@ public class IQv2StoreIntegrationTest {
     private static final Position INPUT_POSITION = Position.emptyPosition();
     private static final String STORE_NAME = "kv-store";
 
-    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
-    private final StoresToTest storeToTest;
+    private static final long RECORD_TIME = System.currentTimeMillis();
 
-    public static class UnknownQuery implements Query<Void> {
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
-    }
+    public static class UnknownQuery implements Query<Void> { }
 
+    private final StoresToTest storeToTest;
     private final boolean cache;
     private final boolean log;
 
@@ -244,6 +249,11 @@ public class IQv2StoreIntegrationTest {
                 return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
                     false);
             }
+
+            @Override
+            public boolean isWindowed() {
+                return true;
+            }
         },
         ROCKS_WINDOW {
             @Override
@@ -251,6 +261,16 @@ public class IQv2StoreIntegrationTest {
                 return Stores.persistentWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
                     false);
             }
+
+            @Override
+            public boolean isWindowed() {
+                return true;
+            }
+
+            @Override
+            public boolean timestamped() {
+                return false;
+            }
         },
         TIME_ROCKS_WINDOW {
             @Override
@@ -258,18 +278,33 @@ public class IQv2StoreIntegrationTest {
                 return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofDays(1),
                     WINDOW_SIZE, false);
             }
+
+            @Override
+            public boolean isWindowed() {
+                return true;
+            }
         },
         IN_MEMORY_SESSION {
             @Override
             public StoreSupplier<?> supplier() {
                 return Stores.inMemorySessionStore(STORE_NAME, Duration.ofDays(1));
             }
+
+            @Override
+            public boolean isSession() {
+                return true;
+            }
         },
         ROCKS_SESSION {
             @Override
             public StoreSupplier<?> supplier() {
                 return Stores.persistentSessionStore(STORE_NAME, Duration.ofDays(1));
             }
+
+            @Override
+            public boolean isSession() {
+                return true;
+            }
         };
 
         public abstract StoreSupplier<?> supplier();
@@ -285,6 +320,14 @@ public class IQv2StoreIntegrationTest {
         public boolean keyValue() {
             return false;
         }
+
+        public boolean isWindowed() {
+            return false;
+        }
+
+        public boolean isSession() {
+            return false;
+        }
     }
 
     @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}")
@@ -329,7 +372,7 @@ public class IQv2StoreIntegrationTest {
                     new ProducerRecord<>(
                         INPUT_TOPIC_NAME,
                         i % partitions,
-                        Time.SYSTEM.milliseconds(),
+                        RECORD_TIME,
                         i,
                         i,
                         null
@@ -497,6 +540,25 @@ public class IQv2StoreIntegrationTest {
                     shouldHandleRangeQueries(valueExtractor);
                 }
             }
+
+            if (storeToTest.isWindowed()) {
+                if (storeToTest.timestamped()) {
+                    final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor =
+                            ValueAndTimestamp::value;
+                    shouldHandleWindowKeyQueries(valueExtractor);
+                    shouldHandleWindowRangeQueries(valueExtractor);
+                } else {
+                    final Function<Integer, Integer> valueExtractor = Function.identity();
+                    shouldHandleWindowKeyQueries(valueExtractor);
+                    shouldHandleWindowRangeQueries(valueExtractor);
+                }
+            }
+
+            if (storeToTest.isSession()) {
+                // Note there's no "timestamped" differentiation here.
+                // Idiosyncratically, SessionStores are _never_ timestamped.
+                shouldHandleSessionKeyQueries();
+            }
         }
     }
 
@@ -532,6 +594,157 @@ public class IQv2StoreIntegrationTest {
         );
     }
 
+    private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> extractor) {
+
+        final long windowSize = WINDOW_SIZE.toMillis();
+        final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
+        // tightest possible start range
+        shouldHandleWindowKeyQuery(
+            2,
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet(2)
+        );
+
+        // miss the window start range
+        shouldHandleWindowKeyQuery(
+            2,
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
+        );
+
+        // miss the key
+        shouldHandleWindowKeyQuery(
+            999,
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet()
+        );
+
+        // miss both
+        shouldHandleWindowKeyQuery(
+            999,
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
+        );
+    }
+
+    private <T> void shouldHandleWindowRangeQueries(final Function<T, Integer> extractor) {
+        final long windowSize = WINDOW_SIZE.toMillis();
+        final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet(0, 1, 2, 3)
+        );
+
+        // miss the window start
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
+        );
+
+        // Should fail to execute this query on a WindowStore.
+        final WindowRangeQuery<Integer, T> query = WindowRangeQuery.withKey(2);
+
+        final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>> request =
+                inStore(STORE_NAME)
+                        .withQuery(query)
+                        .withPartitions(mkSet(0, 1))
+                        .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result =
+                IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        if (result.getGlobalResult() != null) {
+            fail("global tables aren't implemented");
+        } else {
+            final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, T>>> queryResult =
+                result.getPartitionResults();
+            for (final int partition : queryResult.keySet()) {
+                final QueryResult<KeyValueIterator<Windowed<Integer>, T>> partitionResult =
+                    queryResult.get(partition);
+                final boolean failure = partitionResult.isFailure();
+                if (!failure) {
+                    throw new AssertionError(queryResult.toString());
+                }
+                assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
+                assertThat(partitionResult.getFailureMessage(), is(
+                    "This store"
+                        + " (class org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore)"
+                        + " doesn't know how to execute the given query"
+                        + " (WindowRangeQuery{key=Optional[2], timeFrom=Optional.empty, timeTo=Optional.empty})"
+                        + " because WindowStores only supports WindowRangeQuery.withWindowStartRange."
+                        + " Contact the store maintainer if you need support for a new query type."
+                ));
+            }
+        }
+    }
+
+    private <T> void shouldHandleSessionKeyQueries() {
+        shouldHandleSessionRangeQuery(
+            2,
+            mkSet(2)
+        );
+
+        // not preset, so empty result iter
+        shouldHandleSessionRangeQuery(
+            999,
+            mkSet()
+        );
+
+        // Should fail to execute this query on a SessionStore.
+        final WindowRangeQuery<Integer, T> query =
+            WindowRangeQuery.withWindowStartRange(
+                Instant.ofEpochMilli(0L),
+                Instant.ofEpochMilli(0L)
+            );
+
+        final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        if (result.getGlobalResult() != null) {
+            fail("global tables aren't implemented");
+        } else {
+            final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, T>>> queryResult =
+                result.getPartitionResults();
+            for (final int partition : queryResult.keySet()) {
+                final QueryResult<KeyValueIterator<Windowed<Integer>, T>> partitionResult =
+                    queryResult.get(partition);
+                final boolean failure = partitionResult.isFailure();
+                if (!failure) {
+                    throw new AssertionError(queryResult.toString());
+                }
+                assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
+                assertThat(partitionResult.getFailureMessage(), is(
+                    "This store"
+                        + " (class org.apache.kafka.streams.state.internals.MeteredSessionStore)"
+                        + " doesn't know how to execute the given query"
+                        + " (WindowRangeQuery{key=Optional.empty, timeFrom=Optional[1970-01-01T00:00:00Z], timeTo=Optional[1970-01-01T00:00:00Z]})"
+                        + " because SessionStores only support WindowRangeQuery.withKey."
+                        + " Contact the store maintainer if you need support for a new query type."
+                ));
+            }
+        }
+    }
+
     private void globalShouldRejectAllQueries() {
         // See KAFKA-13523
 
@@ -581,6 +794,41 @@ public class IQv2StoreIntegrationTest {
         );
     }
 
+    public <V> void shouldHandleKeyQuery(
+            final Integer key,
+            final Function<V, Integer> valueExtactor,
+            final Integer expectedValue) {
+
+        final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
+        final StateQueryRequest<V> request =
+                inStore(STORE_NAME)
+                        .withQuery(query)
+                        .withPartitions(mkSet(0, 1))
+                        .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<V> result =
+                IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        final QueryResult<V> queryResult = result.getOnlyPartitionResult();
+        final boolean failure = queryResult.isFailure();
+        if (failure) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
+        assertThrows(
+                IllegalArgumentException.class,
+                queryResult::getFailureMessage
+        );
+
+        final V result1 = queryResult.getResult();
+        final Integer integer = valueExtactor.apply(result1);
+        assertThat(integer, is(expectedValue));
+
+        assertThat(queryResult.getExecutionInfo(), is(empty()));
+    }
+
     public <V> void shouldHandleRangeQuery(
         final Optional<Integer> lower,
         final Optional<Integer> upper,
@@ -627,10 +875,10 @@ public class IQv2StoreIntegrationTest {
                     queryResult.get(partition)::getFailureMessage
                 );
 
-                final KeyValueIterator<Integer, V> iterator = queryResult.get(partition)
-                                                                         .getResult();
-                while (iterator.hasNext()) {
-                    actualValue.add(valueExtactor.apply(iterator.next().value));
+                try (final KeyValueIterator<Integer, V> iterator = queryResult.get(partition).getResult()) {
+                    while (iterator.hasNext()) {
+                        actualValue.add(valueExtactor.apply(iterator.next().value));
+                    }
                 }
                 assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
             }
@@ -638,39 +886,153 @@ public class IQv2StoreIntegrationTest {
         }
     }
 
-    public <V> void shouldHandleKeyQuery(
+    public <V> void shouldHandleWindowKeyQuery(
         final Integer key,
+        final Instant timeFrom,
+        final Instant timeTo,
         final Function<V, Integer> valueExtactor,
-        final Integer expectedValue) {
+        final Set<Integer> expectedValue) {
 
-        final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
-        final StateQueryRequest<V> request =
+        final WindowKeyQuery<Integer, V> query = WindowKeyQuery.withKeyAndWindowStartRange(
+            key,
+            timeFrom,
+            timeTo
+        );
+
+        final StateQueryRequest<WindowStoreIterator<V>> request =
             inStore(STORE_NAME)
                 .withQuery(query)
                 .withPartitions(mkSet(0, 1))
                 .withPositionBound(PositionBound.at(INPUT_POSITION));
 
-        final StateQueryResult<V> result =
+        final StateQueryResult<WindowStoreIterator<V>> result =
             IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
 
-        final QueryResult<V> queryResult = result.getOnlyPartitionResult();
-        final boolean failure = queryResult.isFailure();
-        if (failure) {
-            throw new AssertionError(queryResult.toString());
+        if (result.getGlobalResult() != null) {
+            fail("global tables aren't implemented");
+        } else {
+            final Set<Integer> actualValue = new HashSet<>();
+            final Map<Integer, QueryResult<WindowStoreIterator<V>>> queryResult = result.getPartitionResults();
+            for (final int partition : queryResult.keySet()) {
+                final boolean failure = queryResult.get(partition).isFailure();
+                if (failure) {
+                    throw new AssertionError(queryResult.toString());
+                }
+                assertThat(queryResult.get(partition).isSuccess(), is(true));
+
+                assertThrows(
+                    IllegalArgumentException.class,
+                    queryResult.get(partition)::getFailureReason
+                );
+                assertThrows(
+                    IllegalArgumentException.class,
+                    queryResult.get(partition)::getFailureMessage
+                );
+
+                try (final WindowStoreIterator<V> iterator = queryResult.get(partition).getResult()) {
+                    while (iterator.hasNext()) {
+                        actualValue.add(valueExtactor.apply(iterator.next().value));
+                    }
+                }
+                assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
+            }
+            assertThat(actualValue, is(expectedValue));
         }
-        assertThat(queryResult.isSuccess(), is(true));
+    }
 
-        assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
-        assertThrows(
-            IllegalArgumentException.class,
-            queryResult::getFailureMessage
-        );
+    public <V> void shouldHandleWindowRangeQuery(
+            final Instant timeFrom,
+            final Instant timeTo,
+            final Function<V, Integer> valueExtactor,
+            final Set<Integer> expectedValue) {
 
-        final V result1 = queryResult.getResult();
-        final Integer integer = valueExtactor.apply(result1);
-        assertThat(integer, is(expectedValue));
+        final WindowRangeQuery<Integer, V> query = WindowRangeQuery.withWindowStartRange(timeFrom, timeTo);
 
-        assertThat(queryResult.getExecutionInfo(), is(empty()));
+        final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>> request =
+                inStore(STORE_NAME)
+                        .withQuery(query)
+                        .withPartitions(mkSet(0, 1))
+                        .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result =
+                IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        if (result.getGlobalResult() != null) {
+            fail("global tables aren't implemented");
+        } else {
+            final Set<Integer> actualValue = new HashSet<>();
+            final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, V>>> queryResult = result.getPartitionResults();
+            for (final int partition : queryResult.keySet()) {
+                final boolean failure = queryResult.get(partition).isFailure();
+                if (failure) {
+                    throw new AssertionError(queryResult.toString());
+                }
+                assertThat(queryResult.get(partition).isSuccess(), is(true));
+
+                assertThrows(
+                        IllegalArgumentException.class,
+                        queryResult.get(partition)::getFailureReason
+                );
+                assertThrows(
+                        IllegalArgumentException.class,
+                        queryResult.get(partition)::getFailureMessage
+                );
+
+                try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) {
+                    while (iterator.hasNext()) {
+                        actualValue.add(valueExtactor.apply(iterator.next().value));
+                    }
+                }
+                assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
+            }
+            assertThat(actualValue, is(expectedValue));
+        }
+    }
+
+    public <V> void shouldHandleSessionRangeQuery(
+            final Integer key,
+            final Set<Integer> expectedValue) {
+
+        final WindowRangeQuery<Integer, V> query = WindowRangeQuery.withKey(key);
+
+        final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>> request =
+                inStore(STORE_NAME)
+                        .withQuery(query)
+                        .withPartitions(mkSet(0, 1))
+                        .withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result =
+                IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        if (result.getGlobalResult() != null) {
+            fail("global tables aren't implemented");
+        } else {
+            final Set<Integer> actualValue = new HashSet<>();
+            final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, V>>> queryResult = result.getPartitionResults();
+            for (final int partition : queryResult.keySet()) {
+                final boolean failure = queryResult.get(partition).isFailure();
+                if (failure) {
+                    throw new AssertionError(queryResult.toString());
+                }
+                assertThat(queryResult.get(partition).isSuccess(), is(true));
+
+                assertThrows(
+                        IllegalArgumentException.class,
+                        queryResult.get(partition)::getFailureReason
+                );
+                assertThrows(
+                        IllegalArgumentException.class,
+                        queryResult.get(partition)::getFailureMessage
+                );
+
+                try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) {
+                    while (iterator.hasNext()) {
+                        actualValue.add((Integer) iterator.next().value);
+                    }
+                }
+                assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
+            }
+            assertThat(actualValue, is(expectedValue));
+        }
     }
 
     public void shouldCollectExecutionInfo() {