You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/02 23:15:06 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #11567: KAFKA-13494: WindowKeyQuery and WindowRangeQuery

vvcephei commented on a change in pull request #11567:
URL: https://github.com/apache/kafka/pull/11567#discussion_r775709569



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -317,13 +318,25 @@ public boolean isOpen() {
     @Override
     public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
         final boolean collectExecutionInfo) {
+
+        if (query instanceof WindowRangeQuery) {

Review comment:
       Hey @patrickstuedi , Github won't let me comment on the prior conversation, but thanks for pointing out that oversight in the in-memory key-value store! I've submitted https://github.com/apache/kafka/pull/11630 to fix it.
   
   That was one code block that we overlooked when we moved all the query handling logic to StoreQueryUtils.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/WindowKeyQuery.java
##########
@@ -50,4 +55,13 @@ public K getKey() {
     public Optional<Instant> getTimeTo() {
         return timeTo;
     }
+
+    @Override
+    public String toString() {
+        return "WindowKeyQuery{" +
+            "key=" + key +
+            ", timeFrom=" + timeFrom +
+            ", timeTo=" + timeTo +
+            '}';
+    }

Review comment:
       Added the `toString` so that the queries printed in exceptions will contain useful information.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -540,32 +545,19 @@ public void verifyStore() {
                 if (storeToTest.timestamped()) {
                     final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor =
                             ValueAndTimestamp::value;
-                    final Instant timeTo = Instant.now();
-                    final Instant timeFrom = timeTo.minusSeconds(60);
-                    shouldHandleWindowKeyQueries(2, timeFrom, timeTo, valueExtractor);
-                    shouldHandleWindowRangeQueries(timeFrom, timeTo, valueExtractor);
+                    shouldHandleWindowKeyQueries(valueExtractor);
+                    shouldHandleWindowRangeQueries(valueExtractor);
                 } else {
                     final Function<Integer, Integer> valueExtractor = Function.identity();
-                    final Instant timeTo = Instant.now();
-                    final Instant timeFrom = timeTo.minusSeconds(60);
-                    shouldHandleWindowKeyQueries(2, timeFrom, timeTo, valueExtractor);
-                    shouldHandleWindowRangeQueries(timeFrom, timeTo, valueExtractor);
+                    shouldHandleWindowKeyQueries(valueExtractor);
+                    shouldHandleWindowRangeQueries(valueExtractor);
                 }
             }
 
             if (storeToTest.isSession()) {
-                if (storeToTest.timestamped()) {
-                    final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor =
-                            ValueAndTimestamp::value;
-                    final Instant timeTo = Instant.now();
-                    final Instant timeFrom = timeTo.minusSeconds(60);
-                    shouldHandleSessionKeyQueries(2, valueExtractor);
-                } else {
-                    final Function<Integer, Integer> valueExtractor = Function.identity();
-                    final Instant timeTo = Instant.now();
-                    final Instant timeFrom = timeTo.minusSeconds(60);
-                    shouldHandleSessionKeyQueries(2, valueExtractor);
-                }
+                // Note there's no "timestamped" differentiation here.
+                // Idiosyncratically, SessionStores are _never_ timestamped.
+                shouldHandleSessionKeyQueries();

Review comment:
       This isn't our fault. When we added the timestamped stores, we chose not to make SessionStores timestamped because the session bounds already have the end timestamp available, which is identical to the timestamp we would have stored in the value.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -906,10 +948,10 @@ public void shouldRejectUnknownQuery() {
                         queryResult.get(partition)::getFailureMessage
                 );
 
-                final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition)
-                        .getResult();
-                while (iterator.hasNext()) {
-                    actualValue.add(valueExtactor.apply(iterator.next().value));
+                try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) {
+                    while (iterator.hasNext()) {
+                        actualValue.add((Integer) iterator.next().value);
+                    }

Review comment:
       Huh, looks like @vpapavas and I missed the need to close the iterator before.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -93,20 +92,10 @@ Position getPosition() {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public <R> QueryResult<R> query(
-        final Query<R> query,
-        final PositionBound positionBound,
-        final boolean collectExecutionInfo) {
-
-        if (query instanceof RangeQuery) {
-            final RangeQuery<Bytes, byte[]> typedQuery = (RangeQuery<Bytes, byte[]>) query;
-            final KeyValueIterator<Bytes, byte[]> keyValueIterator =  this.range(
-                    typedQuery.getLowerBound().orElse(null), typedQuery.getUpperBound().orElse(null));
-            final R result = (R) keyValueIterator;
-            final QueryResult<R> queryResult = QueryResult.forResult(result);
-            return queryResult;
-        }

Review comment:
       Dropped this unnecessary duplicate code, as discussed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -58,7 +58,7 @@
         final Query<R> query,
         final StateStore store) {
 
-        return new FailedQueryResult<>(
+        return forFailure(

Review comment:
       This just made it easier to inline the "unknown query" message.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/WindowKeyQuery.java
##########
@@ -25,17 +25,22 @@
 
 @Evolving
 public class WindowKeyQuery<K, V> implements Query<WindowStoreIterator<V>> {
+
     private final K key;
     private final Optional<Instant> timeFrom;
     private final Optional<Instant> timeTo;
 
-    private WindowKeyQuery(final K key, final Optional<Instant> timeTo, final Optional<Instant> timeFrom) {
+    private WindowKeyQuery(final K key,
+                           final Optional<Instant> timeTo,
+                           final Optional<Instant> timeFrom) {

Review comment:
       Since I was fixing stuff anyway, I went ahead and fixed a bunch of formatting issues that I didn't bother mentioning before.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -411,23 +422,40 @@ public void close() {
                                              final boolean collectExecutionInfo) {
         final QueryResult<R> result;
         final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) query;
-        if (typedQuery.getTimeFrom().isPresent() && typedQuery.getTimeTo().isPresent()) {
-            final WindowRangeQuery<Bytes, byte[]> rawKeyQuery = WindowRangeQuery.withWindowStartRange(typedQuery.getTimeFrom().get(), typedQuery.getTimeTo().get());
-            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult = wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (typedQuery.getKey().isPresent()) {

Review comment:
       Looks like we were not handling the right query variant before, but it didn't come up yet because other tests were failing before we got to this point.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -296,8 +297,8 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         final QueryResult<byte[]> rawResult =
             wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
         if (rawResult.isSuccess()) {
-            final Deserializer<V> deserializer = getValueDeserializer();
-            final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult());
+            final Function<byte[], V> deserializer = getDeserializeValue(serdes, wrapped());
+            final V value = deserializer.apply(rawResult.getResult());

Review comment:
       Note, the new version in StoreQueryUtils returns a Function, so that the iterators can just invoke the function on the value without having to know the right topic to pass in to the deserializer.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/WindowRangeQuery.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.time.Instant;
+import java.util.Optional;
+
+public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> {

Review comment:
       ```suggestion
   @Evolving
   public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> {
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -411,23 +422,40 @@ public void close() {
                                              final boolean collectExecutionInfo) {
         final QueryResult<R> result;
         final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) query;
-        if (typedQuery.getTimeFrom().isPresent() && typedQuery.getTimeTo().isPresent()) {
-            final WindowRangeQuery<Bytes, byte[]> rawKeyQuery = WindowRangeQuery.withWindowStartRange(typedQuery.getTimeFrom().get(), typedQuery.getTimeTo().get());
-            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult = wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (typedQuery.getKey().isPresent()) {
+            final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
+                WindowRangeQuery.withKey(
+                    Bytes.wrap(serdes.rawKey(typedQuery.getKey().get()))
+                );
+            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
             if (rawResult.isSuccess()) {
-                final MeteredWindowedKeyValueIterator typedResult = new MeteredWindowedKeyValueIterator(rawResult.getResult(),
+                final MeteredWindowedKeyValueIterator<K, V> typedResult =
+                    new MeteredWindowedKeyValueIterator<>(
+                        rawResult.getResult(),
                         fetchSensor,
                         streamsMetrics,
-                        serdes,
-                        time);
-                final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult = QueryResult.forResult(typedResult);
+                        serdes::keyFrom,
+                        StoreQueryUtils.getDeserializeValue(serdes, wrapped()),

Review comment:
       This is the reason I exploded the `serdes` reference in favor of functions for deserializing the key and value. When we're handling queries for non-timestamped stores, we need to be able to adapt the value deserializer.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -308,21 +309,6 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return result;
     }
 
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    private Deserializer<V> getValueDeserializer() {

Review comment:
       I moved this to StoreQueryUtils because we need it in WindowStore as well.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/internals/FailedQueryResult.java
##########
@@ -86,4 +86,14 @@ public R getResult() {
             "Cannot get result for failed query. Failure is " + failureReason.name() + ": "
                 + failure);
     }
+
+    @Override
+    public String toString() {
+        return "FailedQueryResult{" +
+            "failureReason=" + failureReason +
+            ", failure='" + failure + '\'' +
+            ", executionInfo=" + getExecutionInfo() +
+            ", position=" + getPosition() +
+            '}';
+    }

Review comment:
       It's pretty hard to debug test failures without these, so I added them.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
##########
@@ -52,27 +51,17 @@ Position getPosition() {
     }
 
     @Override
-    public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
-        final boolean collectExecutionInfo) {
-
-        if (query instanceof WindowRangeQuery) {
-            @SuppressWarnings("unchecked") final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = (WindowRangeQuery<Bytes, byte[]>) query;
-            if (windowRangeQuery.getKey().isPresent()) {
-                final Bytes key = windowRangeQuery.getKey().get();
-                final KeyValueIterator<Windowed<Bytes>, byte[]> keyValueIterator = this.fetch(key);
-                @SuppressWarnings("unchecked") final R result = (R) keyValueIterator;
-                final QueryResult<R> queryResult = QueryResult.forResult(result);
-                return queryResult;
-            }
-        }

Review comment:
       Dropped duplicate code, as discussed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -316,27 +315,17 @@ public boolean isOpen() {
     }
 
     @Override
-    public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
-        final boolean collectExecutionInfo) {
-
-        if (query instanceof WindowRangeQuery) {
-            @SuppressWarnings("unchecked") final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = (WindowRangeQuery<Bytes, byte[]>) query;
-            if (windowRangeQuery.getKey().isPresent()) {
-                final Bytes key = windowRangeQuery.getKey().get();
-                final KeyValueIterator<Windowed<Bytes>, byte[]> keyValueIterator = this.fetch(key);
-                @SuppressWarnings("unchecked") final R result = (R) keyValueIterator;
-                final QueryResult<R> queryResult = QueryResult.forResult(result);
-                return queryResult;
-            }
-        }

Review comment:
       Dropped this unnecessary duplicate code, as discussed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -247,7 +248,8 @@ public V fetchSession(final K key, final long earliestSessionEndTime, final long
             wrapped().fetch(keyBytes(key)),
             fetchSensor,
             streamsMetrics,
-            serdes,
+            serdes::keyFrom,
+            serdes::valueFrom,

Review comment:
       Explained below

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -112,13 +112,13 @@
     private static final Position INPUT_POSITION = Position.emptyPosition();
     private static final String STORE_NAME = "kv-store";
 
+    private static final long RECORD_TIME = System.currentTimeMillis();

Review comment:
       So that we can deterministically compute the ranges to search, I went ahead and gave the records a well-known timestamp.
   
   A future improvement to the tests could be to add records that will fall into different windows, but this is good enough for now.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -214,73 +217,120 @@ public static boolean isPermitted(
 
     @SuppressWarnings("unchecked")
     private static <R> QueryResult<R> runWindowKeyQuery(final Query<R> query,
-                                                  final PositionBound positionBound,
-                                                  final boolean collectExecutionInfo,
-                                                  final StateStore store) {
+                                                        final PositionBound positionBound,
+                                                        final boolean collectExecutionInfo,
+                                                        final StateStore store) {
         if (store instanceof WindowStore) {
-            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery = (WindowKeyQuery<Bytes, byte[]>) query;
+            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery =
+                (WindowKeyQuery<Bytes, byte[]>) query;
             final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, byte[]>) store;
             try {
                 if (windowKeyQuery.getTimeFrom().isPresent() && windowKeyQuery.getTimeTo().isPresent()) {
-                    final WindowStoreIterator<byte[]> iterator = windowStore.fetch(windowKeyQuery.getKey(), windowKeyQuery.getTimeFrom().get(), windowKeyQuery.getTimeTo().get());
+                    final WindowStoreIterator<byte[]> iterator = windowStore.fetch(
+                        windowKeyQuery.getKey(),
+                        windowKeyQuery.getTimeFrom().get(),
+                        windowKeyQuery.getTimeTo().get()
+                    );
                     return (QueryResult<R>) QueryResult.forResult(iterator);
                 } else {
-                    return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowKeyQuery requires key and window bounds to be present");
+                    return QueryResult.forFailure(
+                        FailureReason.UNKNOWN_QUERY_TYPE,
+                        "This store (" + store.getClass() + ") doesn't know how to"
+                            + " execute the given query (" + query + ") because it only supports"
+                            + " closed-range queries."
+                            + " Contact the store maintainer if you need support"
+                            + " for a new query type."
+                    );
                 }
-            } catch(final Exception e){
+            } catch (final Exception e) {
                 final String message = parseStoreException(e, store, query);
-                return QueryResult.forFailure(
-                        FailureReason.STORE_EXCEPTION,
-                        message
-                );
+                return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, message);
             }
         } else {
-            return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "Support for WindowKeyQuery's is currently restricted to stores of type WindowStore");
+            return QueryResult.forUnknownQueryType(query, store);
         }
     }
 
     @SuppressWarnings("unchecked")
     private static <R> QueryResult<R> runWindowRangeQuery(final Query<R> query,
-                                                        final PositionBound positionBound,
-                                                        final boolean collectExecutionInfo,
-                                                        final StateStore store) {
+                                                          final PositionBound positionBound,
+                                                          final boolean collectExecutionInfo,
+                                                          final StateStore store) {
         if (store instanceof WindowStore) {
-            final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = (WindowRangeQuery<Bytes, byte[]>) query;
+            final WindowRangeQuery<Bytes, byte[]> windowRangeQuery =
+                (WindowRangeQuery<Bytes, byte[]>) query;
             final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, byte[]>) store;
             try {
+                // There's no store API for open time ranges
                 if (windowRangeQuery.getTimeFrom().isPresent() && windowRangeQuery.getTimeTo().isPresent()) {
-                    final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = windowStore.fetchAll(windowRangeQuery.getTimeFrom().get(), windowRangeQuery.getTimeTo().get());
+                    final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                        windowStore.fetchAll(
+                            windowRangeQuery.getTimeFrom().get(),
+                            windowRangeQuery.getTimeTo().get()
+                        );
                     return (QueryResult<R>) QueryResult.forResult(iterator);
                 } else {
-                    return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowRangeQuery requires window bounds to be present when run against a WindowStore");
+                    return QueryResult.forFailure(
+                        FailureReason.UNKNOWN_QUERY_TYPE,
+                        "This store (" + store.getClass() + ") doesn't know how to"
+                            + " execute the given query (" + query + ") because"
+                            + " WindowStores only supports WindowRangeQuery.withWindowStartRange."
+                            + " Contact the store maintainer if you need support"
+                            + " for a new query type."
+                    );
                 }
-            } catch(final Exception e){
+            } catch (final Exception e) {
                 final String message = parseStoreException(e, store, query);
                 return QueryResult.forFailure(
-                        FailureReason.STORE_EXCEPTION,
-                        message
+                    FailureReason.STORE_EXCEPTION,
+                    message
                 );
             }
         } else if (store instanceof SessionStore) {
-            final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = (WindowRangeQuery<Bytes, byte[]>) query;
+            final WindowRangeQuery<Bytes, byte[]> windowRangeQuery =
+                (WindowRangeQuery<Bytes, byte[]>) query;
             final SessionStore<Bytes, byte[]> sessionStore = (SessionStore<Bytes, byte[]>) store;
             try {
                 if (windowRangeQuery.getKey().isPresent()) {
-                    final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = sessionStore.fetch(windowRangeQuery.getKey().get());
+                    final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = sessionStore.fetch(
+                        windowRangeQuery.getKey().get());
                     return (QueryResult<R>) QueryResult.forResult(iterator);
                 } else {
-                    return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowRangeQuery requires key to be present when run against a SessionStore");
+                    return QueryResult.forFailure(
+                        FailureReason.UNKNOWN_QUERY_TYPE,
+                        "This store (" + store.getClass() + ") doesn't know how to"
+                            + " execute the given query (" + query + ") because"
+                            + " SessionStores only support WindowRangeQuery.withKey."
+                            + " Contact the store maintainer if you need support"
+                            + " for a new query type."
+                    );
                 }
-            } catch(final Exception e){
+            } catch (final Exception e) {
                 final String message = parseStoreException(e, store, query);
                 return QueryResult.forFailure(
-                        FailureReason.STORE_EXCEPTION,
-                        message
+                    FailureReason.STORE_EXCEPTION,
+                    message
                 );
             }
         } else {
-            return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "Support for WindowRangeQuery's is currently restricted to Window and Session stores");
+            return QueryResult.forUnknownQueryType(query, store);
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public static <V> Function<byte[], V> getDeserializeValue(final StateSerdes<?, V> serdes,
+                                                              final StateStore wrapped) {

Review comment:
       Moved from the MeteredKeyValueStore. I still hope we can refactor the store hierarchy later to get rid of this entirely.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -411,23 +422,40 @@ public void close() {
                                              final boolean collectExecutionInfo) {
         final QueryResult<R> result;
         final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) query;
-        if (typedQuery.getTimeFrom().isPresent() && typedQuery.getTimeTo().isPresent()) {
-            final WindowRangeQuery<Bytes, byte[]> rawKeyQuery = WindowRangeQuery.withWindowStartRange(typedQuery.getTimeFrom().get(), typedQuery.getTimeTo().get());
-            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult = wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (typedQuery.getKey().isPresent()) {
+            final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
+                WindowRangeQuery.withKey(
+                    Bytes.wrap(serdes.rawKey(typedQuery.getKey().get()))
+                );
+            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
             if (rawResult.isSuccess()) {
-                final MeteredWindowedKeyValueIterator typedResult = new MeteredWindowedKeyValueIterator(rawResult.getResult(),
+                final MeteredWindowedKeyValueIterator<K, V> typedResult =
+                    new MeteredWindowedKeyValueIterator<>(
+                        rawResult.getResult(),
                         fetchSensor,
                         streamsMetrics,
-                        serdes,
-                        time);
-                final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult = QueryResult.forResult(typedResult);
+                        serdes::keyFrom,
+                        StoreQueryUtils.getDeserializeValue(serdes, wrapped()),
+                        time
+                    );
+                final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult =
+                    QueryResult.forResult(typedResult);
                 result = (QueryResult<R>) typedQueryResult;
             } else {
                 // the generic type doesn't matter, since failed queries have no result set.
                 result = (QueryResult<R>) rawResult;
             }
         } else {
-            result = QueryResult.forUnknownQueryType(query, this);
+
+            result = QueryResult.forFailure(
+                FailureReason.UNKNOWN_QUERY_TYPE,
+                "This store (" + getClass() + ") doesn't know how to"
+                    + " execute the given query (" + query + ") because"
+                    + " SessionStores only support WindowRangeQuery.withKey."
+                    + " Contact the store maintainer if you need support"
+                    + " for a new query type."
+            );

Review comment:
       More explanatory error, as discussed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -214,73 +217,120 @@ public static boolean isPermitted(
 
     @SuppressWarnings("unchecked")
     private static <R> QueryResult<R> runWindowKeyQuery(final Query<R> query,
-                                                  final PositionBound positionBound,
-                                                  final boolean collectExecutionInfo,
-                                                  final StateStore store) {
+                                                        final PositionBound positionBound,
+                                                        final boolean collectExecutionInfo,
+                                                        final StateStore store) {
         if (store instanceof WindowStore) {
-            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery = (WindowKeyQuery<Bytes, byte[]>) query;
+            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery =
+                (WindowKeyQuery<Bytes, byte[]>) query;
             final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, byte[]>) store;
             try {
                 if (windowKeyQuery.getTimeFrom().isPresent() && windowKeyQuery.getTimeTo().isPresent()) {
-                    final WindowStoreIterator<byte[]> iterator = windowStore.fetch(windowKeyQuery.getKey(), windowKeyQuery.getTimeFrom().get(), windowKeyQuery.getTimeTo().get());
+                    final WindowStoreIterator<byte[]> iterator = windowStore.fetch(
+                        windowKeyQuery.getKey(),
+                        windowKeyQuery.getTimeFrom().get(),
+                        windowKeyQuery.getTimeTo().get()
+                    );
                     return (QueryResult<R>) QueryResult.forResult(iterator);
                 } else {
-                    return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowKeyQuery requires key and window bounds to be present");
+                    return QueryResult.forFailure(
+                        FailureReason.UNKNOWN_QUERY_TYPE,
+                        "This store (" + store.getClass() + ") doesn't know how to"
+                            + " execute the given query (" + query + ") because it only supports"
+                            + " closed-range queries."
+                            + " Contact the store maintainer if you need support"
+                            + " for a new query type."
+                    );
                 }
-            } catch(final Exception e){
+            } catch (final Exception e) {
                 final String message = parseStoreException(e, store, query);
-                return QueryResult.forFailure(
-                        FailureReason.STORE_EXCEPTION,
-                        message
-                );
+                return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, message);
             }
         } else {
-            return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "Support for WindowKeyQuery's is currently restricted to stores of type WindowStore");
+            return QueryResult.forUnknownQueryType(query, store);

Review comment:
       This message didn't need to be specialized.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -214,73 +217,120 @@ public static boolean isPermitted(
 
     @SuppressWarnings("unchecked")
     private static <R> QueryResult<R> runWindowKeyQuery(final Query<R> query,
-                                                  final PositionBound positionBound,
-                                                  final boolean collectExecutionInfo,
-                                                  final StateStore store) {
+                                                        final PositionBound positionBound,
+                                                        final boolean collectExecutionInfo,
+                                                        final StateStore store) {
         if (store instanceof WindowStore) {
-            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery = (WindowKeyQuery<Bytes, byte[]>) query;
+            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery =
+                (WindowKeyQuery<Bytes, byte[]>) query;
             final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, byte[]>) store;
             try {
                 if (windowKeyQuery.getTimeFrom().isPresent() && windowKeyQuery.getTimeTo().isPresent()) {
-                    final WindowStoreIterator<byte[]> iterator = windowStore.fetch(windowKeyQuery.getKey(), windowKeyQuery.getTimeFrom().get(), windowKeyQuery.getTimeTo().get());
+                    final WindowStoreIterator<byte[]> iterator = windowStore.fetch(
+                        windowKeyQuery.getKey(),
+                        windowKeyQuery.getTimeFrom().get(),
+                        windowKeyQuery.getTimeTo().get()
+                    );
                     return (QueryResult<R>) QueryResult.forResult(iterator);
                 } else {
-                    return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowKeyQuery requires key and window bounds to be present");
+                    return QueryResult.forFailure(
+                        FailureReason.UNKNOWN_QUERY_TYPE,
+                        "This store (" + store.getClass() + ") doesn't know how to"
+                            + " execute the given query (" + query + ") because it only supports"
+                            + " closed-range queries."
+                            + " Contact the store maintainer if you need support"
+                            + " for a new query type."
+                    );
                 }
-            } catch(final Exception e){
+            } catch (final Exception e) {

Review comment:
       This was the checkstyle error that was failing your build.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -266,6 +266,11 @@ public boolean isWindowed() {
             public boolean isWindowed() {
                 return true;
             }
+
+            @Override
+            public boolean timestamped() {
+                return false;
+            }

Review comment:
       I think this was my bad from before. This store is not a timestamped store.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -602,33 +594,78 @@ public void verifyStore() {
         );
     }
 
-    private <T> void shouldHandleWindowKeyQueries(final Integer key, final Instant timeFrom, final Instant timeTo, final Function<T, Integer> extractor) {
+    private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> extractor) {
+
+        final long windowSize = WINDOW_SIZE.toMillis();
+        final long windowStart = (RECORD_TIME / windowSize) * windowSize;

Review comment:
       This is why the test was failing for you. The query is for a range of window start times, not record times. Since the window size is five minutes, the range `[now - 1 minute, now]` wasn't going to contain the actual window start time of `now - 5 minutes`. In other words, just a simple oversight :/ Sorry for the trouble. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -602,33 +594,78 @@ public void verifyStore() {
         );
     }
 
-    private <T> void shouldHandleWindowKeyQueries(final Integer key, final Instant timeFrom, final Instant timeTo, final Function<T, Integer> extractor) {
+    private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> extractor) {
+
+        final long windowSize = WINDOW_SIZE.toMillis();
+        final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
+        // tightest possible start range
         shouldHandleWindowKeyQuery(
-                key,
-                timeFrom,
-                timeTo,
-                extractor,
-                mkSet(1, 2, 3)
+            2,
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet(2)

Review comment:
       Since we're specifying the key, we expect only to get back windows with the value for that key. The aggregation we specified is to sum all values for the key, and it comes out to `2` because we only write one value for each key; namely, the value is the same number as the key.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -367,7 +372,7 @@ public static void before()
                     new ProducerRecord<>(
                         INPUT_TOPIC_NAME,
                         i % partitions,
-                        Time.SYSTEM.milliseconds(),
+                        RECORD_TIME,

Review comment:
       Setting the records' timestamps.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -540,32 +545,19 @@ public void verifyStore() {
                 if (storeToTest.timestamped()) {
                     final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor =
                             ValueAndTimestamp::value;
-                    final Instant timeTo = Instant.now();
-                    final Instant timeFrom = timeTo.minusSeconds(60);
-                    shouldHandleWindowKeyQueries(2, timeFrom, timeTo, valueExtractor);
-                    shouldHandleWindowRangeQueries(timeFrom, timeTo, valueExtractor);
+                    shouldHandleWindowKeyQueries(valueExtractor);
+                    shouldHandleWindowRangeQueries(valueExtractor);

Review comment:
       I moved the bounds into the plural check method, so we can check correct behavior for multiple bounds.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -602,33 +594,78 @@ public void verifyStore() {
         );
     }
 
-    private <T> void shouldHandleWindowKeyQueries(final Integer key, final Instant timeFrom, final Instant timeTo, final Function<T, Integer> extractor) {
+    private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> extractor) {
+
+        final long windowSize = WINDOW_SIZE.toMillis();
+        final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
+        // tightest possible start range
         shouldHandleWindowKeyQuery(
-                key,
-                timeFrom,
-                timeTo,
-                extractor,
-                mkSet(1, 2, 3)
+            2,
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet(2)
+        );
 
+        // miss the window start range
+        shouldHandleWindowKeyQuery(
+            2,
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
+        );
+
+        // miss the key
+        shouldHandleWindowKeyQuery(
+            999,
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet()
+        );
+
+        // miss both
+        shouldHandleWindowKeyQuery(
+            999,
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
         );

Review comment:
       It seemed like a good idea to check a few other query configurations, but none of them showed any problems.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -602,34 +594,155 @@ public void verifyStore() {
         );
     }
 
-    private <T> void shouldHandleWindowKeyQueries(final Integer key, final Instant timeFrom, final Instant timeTo, final Function<T, Integer> extractor) {
+    private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> extractor) {
+
+        final long windowSize = WINDOW_SIZE.toMillis();
+        final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
+        // tightest possible start range
         shouldHandleWindowKeyQuery(
-                key,
-                timeFrom,
-                timeTo,
-                extractor,
-                mkSet(1, 2, 3)
+            2,
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet(2)
+        );
 
+        // miss the window start range
+        shouldHandleWindowKeyQuery(
+            2,
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
+        );
+
+        // miss the key
+        shouldHandleWindowKeyQuery(
+            999,
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet()
+        );
+
+        // miss both
+        shouldHandleWindowKeyQuery(
+            999,
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
         );
     }
 
-    private <T> void shouldHandleWindowRangeQueries(final Instant timeFrom, final Instant timeTo, final Function<T, Integer> extractor) {
+    private <T> void shouldHandleWindowRangeQueries(final Function<T, Integer> extractor) {
+        final long windowSize = WINDOW_SIZE.toMillis();
+        final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
         shouldHandleWindowRangeQuery(
-                timeFrom,
-                timeTo,
-                extractor,
-                mkSet(1, 2, 3)
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet(0, 1, 2, 3)
+        );
 
+        // miss the window start
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
         );
+
+        // Should fail to execute this query on a WindowStore.

Review comment:
       I also added some cases to test the specialized failure messages.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -803,18 +845,18 @@ public void shouldRejectUnknownQuery() {
                 assertThat(queryResult.get(partition).isSuccess(), is(true));
 
                 assertThrows(
-                        IllegalArgumentException.class,
-                        queryResult.get(partition)::getFailureReason
+                    IllegalArgumentException.class,
+                    queryResult.get(partition)::getFailureReason
                 );
                 assertThrows(
-                        IllegalArgumentException.class,
-                        queryResult.get(partition)::getFailureMessage
+                    IllegalArgumentException.class,
+                    queryResult.get(partition)::getFailureMessage
                 );
 
-                final WindowStoreIterator<V> iterator = queryResult.get(partition)
-                        .getResult();
-                while (iterator.hasNext()) {
-                    actualValue.add(valueExtactor.apply(iterator.next().value));
+                try (final WindowStoreIterator<V> iterator = queryResult.get(partition).getResult()) {
+                    while (iterator.hasNext()) {
+                        actualValue.add(valueExtactor.apply(iterator.next().value));
+                    }

Review comment:
       These iterators need to be closed or they'll leak resources (it's the same for IQv1 as well).

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -350,9 +353,35 @@ public boolean isOpen() {
         return open;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
         final boolean collectExecutionInfo) {
+
+        if (query instanceof WindowKeyQuery) {
+            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery = (WindowKeyQuery<Bytes, byte[]>) query;
+            if (windowKeyQuery.getTimeFrom().isPresent() && windowKeyQuery.getTimeTo().isPresent()) {
+                final Bytes key = windowKeyQuery.getKey();
+                final Instant lower = windowKeyQuery.getTimeFrom().get();
+                final Instant upper = windowKeyQuery.getTimeTo().get();
+                final WindowStoreIterator<byte[]> iterator = this.fetch(key, lower, upper);
+                final R result = (R) iterator;
+                final QueryResult<R> queryResult = QueryResult.forResult(result);
+                return queryResult;
+            }
+        } else if (query instanceof WindowRangeQuery) {
+            final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = (WindowRangeQuery<Bytes, byte[]>) query;
+            if (windowRangeQuery.getTimeFrom().isPresent() &&
+                    windowRangeQuery.getTimeTo().isPresent()) {

Review comment:
       Ah, I was previously mistaken about this. My prior comment was made with your KIP-763 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-763%3A+Range+queries+with+open+endpoints) in mind.
   
   Upon looking at the WindowStore code and the KIP, I now see that we don't support open ranges on window stores, and that the KIP was only about KeyValue range queries.
   
   I'm also just now catching on that you're using `Optional.of` there, not `Optional.ofNullable`, so it's always the case that we either have both bounds and no key or a key with no bounds.
   
   I had a second concern that the window store is not handling the case of a range query with a key and no bounds, but upon examination of the WindowStore, I can see that there is no method to handle that case, so it makes sense to return "unknown query" for it (since the objective at this moment is parity). It might be nice if we additionally explain that the error is because the store can't handle the parameterization rather than the query itself, though.
   
   And finally, just to clarify, I do think we should consolidate on handling the queries in `StoreQueryUtils` rather than in the stores themselves, so these comments are meant to apply to the handler in that class, and we should just delete the code in this class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org