You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/09 03:58:47 UTC

[GitHub] [kafka] vvcephei opened a new pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

vvcephei opened a new pull request #11582:
URL: https://github.com/apache/kafka/pull/11582


   Implement the KeyQuery and RawKeyQuery as proposed in KIP-796
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771739597



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+                final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
+                    (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer();
+                deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
+            } else {
+                deserializer = vSerde.deserializer();
+            }

Review comment:
       Sorry, I missed this thread before. I think these points are discussed on other threads in this PR, though. Tl;dr: I think we should aim to clean this up in https://issues.apache.org/jira/browse/KAFKA-13526
   
   For now, I believe this logic is correct. However, it's good that you pointed out we're only testing all _dsl_ store combinations. I filed https://issues.apache.org/jira/browse/KAFKA-13553 to extend the IT to also test all _papi_ store combinations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771517463



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo,
+            final StateStore store
+        );
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
+        mkMap(
+            mkEntry(
+                PingQuery.class,

Review comment:
       Actually, the PingQuery isn't in the KIP at all. I added it (as an internal API) so that I could verify the stores work properly in the absence of any actual queries (because I implemented the framework before any real queries, to control the scope of the PR).
   
   Now that we have real queries, I don't think we need to keep Ping around. I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737792



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface

Review comment:
       I see. So we should add it elsewhere, too (of course not as part of the IQ work).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771738427



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -42,16 +102,21 @@ private StoreQueryUtils() {
         final int partition
     ) {
 
-        final QueryResult<R> result;
         final long start = collectExecutionInfo ? System.nanoTime() : -1L;
-        if (query instanceof PingQuery) {
-            if (!isPermitted(position, positionBound, partition)) {
-                result = QueryResult.notUpToBound(position, positionBound, partition);
-            } else {
-                result = (QueryResult<R>) QueryResult.forResult(true);
-            }
-        } else {
+        final QueryResult<R> result;
+
+        final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());

Review comment:
       🤔 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#issuecomment-998160736


   LGTM, I believe @vvcephei  will address the remaining comments right after it's merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r768812726



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;

Review comment:
       Javadocs :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r768807046



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {

Review comment:
       We don't really need this. You can just do `QueryResult.forResult` in the `MeteredKeyValue` store for example to get the typed result




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771734218



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {

Review comment:
       We can do this, but should we better say "passed down", not "handled" in the message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771466959



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -29,10 +29,10 @@
  */
 public final class QueryResult<R> {
 
-    private final List<String> executionInfo = new LinkedList<>();
     private final FailureReason failureReason;
     private final String failure;
     private final R result;
+    private List<String> executionInfo = new LinkedList<>();

Review comment:
       Thanks; that would be another way to do it. I'm not sure if that would be clearly better or not, though.
   
   It's for getting more details about how the query was actually executed inside of Streams. Right now, if you request it as part of the query, each store layer will report what it did and how long it took. For runtime queries, you wouldn't want to use it, but I wanted to enable debugging if the cases where query execution seems like it's taking longer than expected. Also, it could be used for tracing, in which every Nth query is run with the execution info on.
   
   It's a list of Strings so that each store layer / operation can just add one "line" of info (like a stack trace), but we don't waste time and memory actually concatenating them with newlines. We considered adding more structure (such as having a field for execution time), but kept it as a string so as not to restrict the kind of "execution information" we might find useful to add in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771492764



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {

Review comment:
       Yeah, the idea was to actually be able to see everything that happened during query execution, specifically to demystify what's going on when you're debugging.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r767312544



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+                final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
+                    (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer();
+                deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
+            } else {
+                deserializer = vSerde.deserializer();
+            }

Review comment:
       This is a bit of a hack. I think we can possibly do a better job of this in the Timestamped facade that we insert in front of non-timestamped stores, but I don't want to over-engineer it until we have a few different query types in the code base to make sure that what we do works in general.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737576



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());

Review comment:
       > Also, because we always wrap the non-timestamped store with the KeyValueToTimestampedKeyValueByteStoreAdapter, we also always pass through the MeteredTimestampedKeyValue store whether the inner store is really timestamped or not.
   
   I don't think so. We only do this in the DSL, but not the PAPI.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771495764



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+@Evolving
+public class KeyQuery<K, V> implements Query<V> {

Review comment:
       Good idea. I'm not sure whether it will be ultimately be good to extend queries with other queries later, but it doesn't hurt to add this now so that we can make an explicit decision about it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737653



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {

Review comment:
       I'm getting the impression that you're not a huge fan of the phrasing of these messages. :) Can we tackle this question in a follow-on fashion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771459374



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
##########
@@ -52,5 +52,12 @@
      * The requested store partition does not exist at all. For example, partition 4 was requested,
      * but the store in question only has 4 partitions (0 through 3).
      */
-    DOES_NOT_EXIST;
+    DOES_NOT_EXIST,
+
+    /**
+     * The store that handled the query got an exception during query execution. The message
+     * will contain the exception details. Depending on the nature of the exception, the caller
+     * may be able to retry this instance or may need to try a different instance.
+     */
+    STORE_EXCEPTION;

Review comment:
       Thanks! @guozhangwang reminded me during the discussion to make sure that all the cases in that KIP were accounted for. Some are still exceptions, and some are now FailureReasons: https://lists.apache.org/thread/brvwvpvsbsfvqpqg6jvry5hqny0vm2tr




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771505654



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {

Review comment:
       Yes, this is super weird, and I think that https://issues.apache.org/jira/browse/KAFKA-13526 will give us a more elegant way to correct it, but as it stands right now, this is necessary.
   
   The MeteredStore's serde is always a ValueAndTimestamp serde regardless of whether the inner store is Timestamped or not. This works because the normal execution flow actually converts the byte results from non-timestamped stores into the binary schema of a ValueAndTimestamp.
   
   What we do is, when you have a non-timestamped store, we wrap it with an extra layer (`org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter`) that pads the returned values with a fake timestamp (`org.apache.kafka.streams.state.TimestampedBytesStore#convertToTimestampedFormat`). That makes sense when the store is used by processors (particularly the ones in the DSL) because it makes the store configuration orthogonal to the processor logic, but for IQ, it's just spending extra time and memory for no productive purpose.
   
   One of the primary design goals of IQv2 is to make query execution as lean as possible, so we did not implement the same padding logic for non-timestamped data and instead just bubble up to the MeteredStore the actual byte array returned from the BytesStore. Which means that if we want to deserialize it, we need to know whether to use the ValueAndTimestamp deserializer or just the Value's deserializer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;

Review comment:
       > I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too.
   
   If there is no strict reason not to throw an `IllegalStateException`, I would strongly advocate to throw. It not only guards against potential bugs, but also expresses the semantics for developers (ie, us) much cleaner and makes the code easier to read/reason about.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771494393



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {

Review comment:
       Sorry about that; oversight.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771726705



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -253,12 +262,17 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
             rawRangeQuery = RangeQuery.withNoBounds();
         }
         final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
-                wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo);
+            wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo);
         if (rawResult.isSuccess()) {
             final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult();
             final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator(
-                    iterator, getSensor, getValueDeserializer());
-            final QueryResult<KeyValueIterator<K, V>> typedQueryResult = QueryResult.forResult(resultIterator);
+                iterator,
+                getSensor,
+                getValueDeserializer()
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult = rawResult.swapResult(
+                resultIterator

Review comment:
       nit: why newline with just one parameter?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+/**
+ * Interactive query for retrieving a single record based on its key.
+ */
+@Evolving
+public final class KeyQuery<K, V> implements Query<V> {
+
+    private final K key;
+
+    private KeyQuery(final K key) {
+        this.key = key;
+    }
+
+    /**
+     * Creates a query that will retrieve the record identified by {@code key} if it exists
+     * (or {@code null} otherwise).
+     * @param key The key to retrieve
+     * @param <K> The type of the key
+     * @param <V> The type of the value that will be retrieved
+     */
+    public static <K, V> KeyQuery<K, V> withKey(final K key) {
+        return new KeyQuery<>(key);

Review comment:
       Should we check the `key` is not null here? Since in later callers e.g. `final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));` we do not check if `getKey()` is null or not, and `keyBytes` function could throw if it is.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -162,15 +163,39 @@ public static boolean isPermitted(
             }
             final R result = (R) iterator;
             return QueryResult.forResult(result);
-        } catch (final Throwable t) {
-            final String message = parseStoreException(t, store, query);
+        } catch (final Exception e) {
+            final String message = parseStoreException(e, store, query);
             return QueryResult.forFailure(
                 FailureReason.STORE_EXCEPTION,
                 message
             );
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                                  final PositionBound positionBound,
+                                                  final boolean collectExecutionInfo,
+                                                  final StateStore store) {
+        if (store instanceof KeyValueStore) {
+            final KeyQuery<Bytes, byte[]> rawKeyQuery = (KeyQuery<Bytes, byte[]>) query;
+            final KeyValueStore<Bytes, byte[]> keyValueStore =
+                (KeyValueStore<Bytes, byte[]>) store;
+            try {
+                final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
+                return (QueryResult<R>) QueryResult.forResult(bytes);

Review comment:
       Should we use `swap` here as well?
   
   Also, I'm feeling maybe we can introduce an internal class extending on `KeyQuery<?, byte[]>` and only define the `swap` in that class (see my other comment above).

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -162,15 +163,39 @@ public static boolean isPermitted(
             }
             final R result = (R) iterator;
             return QueryResult.forResult(result);
-        } catch (final Throwable t) {
-            final String message = parseStoreException(t, store, query);
+        } catch (final Exception e) {
+            final String message = parseStoreException(e, store, query);
             return QueryResult.forFailure(
                 FailureReason.STORE_EXCEPTION,
                 message
             );
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                                  final PositionBound positionBound,
+                                                  final boolean collectExecutionInfo,
+                                                  final StateStore store) {
+        if (store instanceof KeyValueStore) {
+            final KeyQuery<Bytes, byte[]> rawKeyQuery = (KeyQuery<Bytes, byte[]>) query;
+            final KeyValueStore<Bytes, byte[]> keyValueStore =
+                (KeyValueStore<Bytes, byte[]>) store;
+            try {
+                final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
+                return (QueryResult<R>) QueryResult.forResult(bytes);
+            } catch (final Exception e) {
+                final String message = parseStoreException(e, store, query);

Review comment:
       Should `parseStoreException`'s first parameter be `Exception` then?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -677,18 +638,56 @@ public void shouldHandlePingQuery() {
         }
     }
 
+    public <V> void shouldHandleKeyQuery(
+        final Integer key,
+        final Function<V, Integer> valueExtactor,
+        final Integer expectedValue) {
+
+        final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
+        final StateQueryRequest<V> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<V> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        final QueryResult<V> queryResult =
+            result.getGlobalResult() != null

Review comment:
       I think I'm still a bit confused here about global result, thinking we should have not supported this, and hence here it should always be `null`, is that right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;
+        } else {
+            final QueryResult<V> result = new QueryResult<>(value);

Review comment:
       @vvcephei WDYT about having an extended `QueryResultInBytes` (just a placeholder for name) on `QueryResult<byte[]>` and move this function to that extended class? This way we can avoid mistakenly using the swap functions.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##########
@@ -34,7 +34,7 @@
  * <p>
  */
 @Evolving
-public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
+public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {

Review comment:
       Just realized: we call the range query of kv-store `RangeQuery`, and the range query of window store `WindowRangeQuery`, and similarly for key queries. Is that intentional?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -54,22 +55,22 @@
         );
     }
 
-    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
+    @SuppressWarnings("rawtypes")
+    private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP =
         mkMap(
-            mkEntry(
-                PingQuery.class,
-                (query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true)
-            ),
             mkEntry(
                 RangeQuery.class,
                 StoreQueryUtils::runRangeQuery
+            ),
+            mkEntry(KeyQuery.class,
+                    StoreQueryUtils::runKeyQuery

Review comment:
       Seems misaligned.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771744164



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -162,15 +163,39 @@ public static boolean isPermitted(
             }
             final R result = (R) iterator;
             return QueryResult.forResult(result);
-        } catch (final Throwable t) {
-            final String message = parseStoreException(t, store, query);
+        } catch (final Exception e) {
+            final String message = parseStoreException(e, store, query);
             return QueryResult.forFailure(
                 FailureReason.STORE_EXCEPTION,
                 message
             );
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                                  final PositionBound positionBound,
+                                                  final boolean collectExecutionInfo,
+                                                  final StateStore store) {
+        if (store instanceof KeyValueStore) {
+            final KeyQuery<Bytes, byte[]> rawKeyQuery = (KeyQuery<Bytes, byte[]>) query;
+            final KeyValueStore<Bytes, byte[]> keyValueStore =
+                (KeyValueStore<Bytes, byte[]>) store;
+            try {
+                final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
+                return (QueryResult<R>) QueryResult.forResult(bytes);

Review comment:
       @vvcephei but in this PR at least, `Should we use swap here as well?`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771472556



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;
+        } else {
+            final QueryResult<V> result = new QueryResult<>(value);

Review comment:
       What's happening here is that we're turning a `QueryResult<R>` into a `QueryResult<V>`. A concrete example (in fact the only use case) of this is in the MeteredStore, we get back a raw result from the BytesStore and need to deserialize it, so we need to convert the `QueryResult<byte[]>` into a `QueryResult<Integer>` or something.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771733605



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;
+        } else {
+            final QueryResult<V> result = new QueryResult<>(value);

Review comment:
       Why not use two objects?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771735231



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##########
@@ -34,7 +34,7 @@
  * <p>
  */
 @Evolving
-public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
+public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {

Review comment:
       Good point. We can rename it to KeyRangeQuery in a follow-on PR. I'll file a Jira.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771517463



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo,
+            final StateStore store
+        );
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
+        mkMap(
+            mkEntry(
+                PingQuery.class,

Review comment:
       Actually, the PingQuery isn't in the KIP at all. I added it so that I could verify the stores work properly in the absence of any actual queries (because I implemented the framework before any real queries, to control the scope of the PR).
   
   Now that we have real queries, I don't think we need to keep Ping around. I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771738126



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo,
+            final StateStore store
+        );
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =

Review comment:
       Not sure if I fully understand, but might be less important.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771735440



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;

Review comment:
       sounds good!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771470456



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;

Review comment:
       In the case of a failure, there is no result, just the failure message. I wanted to maintain an invariant that there is always either a failure or a result, but not both or neither. I also didn't think it would be right to allow accidentally converting a failure to a successful result via this method.
   
   I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771493968



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");

Review comment:
       Thanks; this seems about the same, and it would apply to all the other execution info messages we've got, so I think I'll keep it the same for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;

Review comment:
       > I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too.
   
   If there is no strict reason not to throw an `IllegalStateException`, I would strongly advocate to throw. It not only guards against potential bugs, but also expressed the semantics for developers (ie, us) much cleaner and make the code easier to read/reason about.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771520958



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo,
+            final StateStore store
+        );
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
+        mkMap(
+            mkEntry(
+                PingQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true)
+            ),
+            mkEntry(KeyQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> {
+                    if (store instanceof KeyValueStore) {
+                        final KeyQuery<Bytes, byte[]> rawKeyQuery = (KeyQuery<Bytes, byte[]>) query;
+                        final KeyValueStore<Bytes, byte[]> keyValueStore =
+                            (KeyValueStore<Bytes, byte[]>) store;
+                        try {
+                            final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
+                            return QueryResult.forResult(bytes);
+                        } catch (final Throwable t) {

Review comment:
       Good point. It's fine to catch Throwables, but it's not fine to swallow Errors, as I'm doing here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771513674



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface

Review comment:
       The compiler is smart enough. It's just an informative annotation. Its only practical purpose is to raise compilation error if you try to declare more than one method in it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#issuecomment-998165138


   Thanks, all! This is all really good feedback, and we clearly have some opportunities to tighten up both the API and internals.
   
   I will go through the follow-on tickets that are filed in Jira and mark them as 3.2 blockers. That way, we can be sure that we will either address the API issues before the release or pull IQv2 from the release.
   
   Immediately after merging, I will follow up with a PR to remove `QueryResult#swapResult` from the public API. I think both the fact that it's an instance method and its name were misleading, but after reflection it also doesn't really need to be part of the caller-facing result class at all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#issuecomment-992922162


   Note to reviewers, upon reflection, and based on the discussions on the other ongoing KIPs to add IQv2 queries, I've decided to drop the "RawKeyQuery" that I had originally proposed. It really just saved us from one extra cast in an execution path that already has a ton of other casts. It was an attempt to be a little elegant, but I don't think it was successful.
   
   I'm hoping that once we have several queries in place, we'll be able to golf it a bit and come up with an actually more elegant approach to the internal code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771731855



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -267,17 +281,41 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return result;
     }
 
+
     @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                           final PositionBound positionBound,
+                                           final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+            KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final Deserializer<V> deserializer = getValueDeserializer();
+            final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult());
+            final QueryResult<V> typedQueryResult =
+                rawResult.swapResult(value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
     private Deserializer<V> getValueDeserializer() {
-        final Serde<V> vSerde = serdes.valueSerde();
+        final Serde<V> valueSerde = serdes.valueSerde();
         final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
         final Deserializer<V> deserializer;
-        if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+        if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {

Review comment:
       This is the part that I'm not completely sure about either... maybe some quick sync on this would be more effective?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;

Review comment:
       > I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too.
   
   If there is no strict reason not to throw an `IllegalStateException`, I would strongly advocate to throw. It's not guards against potential bugs, but also expressed the semantics for developers (ie, us) much cleaner and make the code easier to read/reason about.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r770076908



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;

Review comment:
       Why do we not allow to swap the result if the current result has a failure? And if we don't want to allow swapping, why just return `this` but not throw an exception?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771741301



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -253,12 +262,17 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
             rawRangeQuery = RangeQuery.withNoBounds();
         }
         final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
-                wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo);
+            wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo);
         if (rawResult.isSuccess()) {
             final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult();
             final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator(
-                    iterator, getSensor, getValueDeserializer());
-            final QueryResult<KeyValueIterator<K, V>> typedQueryResult = QueryResult.forResult(resultIterator);
+                iterator,
+                getSensor,
+                getValueDeserializer()
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult = rawResult.swapResult(
+                resultIterator

Review comment:
       Probably autoformatted because the line was too long.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771741132



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##########
@@ -34,7 +34,7 @@
  * <p>
  */
 @Evolving
-public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
+public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {

Review comment:
       https://issues.apache.org/jira/browse/KAFKA-13554




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771745563



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -677,18 +638,56 @@ public void shouldHandlePingQuery() {
         }
     }
 
+    public <V> void shouldHandleKeyQuery(
+        final Integer key,
+        final Function<V, Integer> valueExtactor,
+        final Integer expectedValue) {
+
+        final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
+        final StateQueryRequest<V> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<V> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        final QueryResult<V> queryResult =
+            result.getGlobalResult() != null

Review comment:
       This line was written before I scratched global store support from the current scope. I'll drop the check from this test for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r770076908



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;

Review comment:
       Why do we know allow to swap the result if the current result has a failure? And if we don't want to allow swapping, why just return `this` but not throw an exception?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {

Review comment:
       Does it make sense to add execution info for this case? We pushed the query down, but did not handle it and thus the inner store would track (and we should not track)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");

Review comment:
       nit: `in` -> `within`
   (also: maybe add `.` at the end?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+                final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
+                    (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer();
+                deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
+            } else {
+                deserializer = vSerde.deserializer();
+            }

Review comment:
       > But Streams has a store layer that inserts dummy timestamps to make everything conform to the same types.
   
   We only do this for the DSL. For the PAPI, the "plain KeyValueStore" can still be used as-is, and we don't modify anything about it. Thus IQ must be able to handle both cases.
   
   Also, if we add this face, we would use a `MeteredTimestampedKeyValueStore` not this class. I think we need to move the logic for the timestamped case into the other class.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {

Review comment:
       nit: indenting (read like 2 parameters, but has 3)

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;
+        } else {
+            final QueryResult<V> result = new QueryResult<>(value);

Review comment:
       The generic type of `QueryResult` is `R` -- it does not seem safe to allow swapping in a `V` result?
   
   If we need to allow for different return types, why does `QueryType` has a generic parameter to begin with? Seems useless if we cannot rely on it?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {

Review comment:
       nit indention
   
   very hard to read

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");

Review comment:
       Use `Time` no` System`

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -29,10 +29,10 @@
  */
 public final class QueryResult<R> {
 
-    private final List<String> executionInfo = new LinkedList<>();
     private final FailureReason failureReason;
     private final String failure;
     private final R result;
+    private List<String> executionInfo = new LinkedList<>();

Review comment:
       Why not keep it `final` and use `clear()` and `addAll` instead?
   
   For my own education: what is `executionInfo` (and why is it a `String` list)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();

Review comment:
       We should use `Time`

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();

Review comment:
       nit `vSerde` -> `valueSerde`

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+@Evolving
+public class KeyQuery<K, V> implements Query<V> {

Review comment:
       Should this class be `final`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface

Review comment:
       We have a lot of functional interfaces but I don't think we annotated it anywhere. Also wondering what we gain by doing it? The Java compiler should be smart enough and not need this annotation? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -42,16 +102,21 @@ private StoreQueryUtils() {
         final int partition
     ) {
 
-        final QueryResult<R> result;
         final long start = collectExecutionInfo ? System.nanoTime() : -1L;
-        if (query instanceof PingQuery) {
-            if (!isPermitted(position, positionBound, partition)) {
-                result = QueryResult.notUpToBound(position, positionBound, partition);
-            } else {
-                result = (QueryResult<R>) QueryResult.forResult(true);
-            }
-        } else {
+        final QueryResult<R> result;
+
+        final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());

Review comment:
       Seems I don't understand the control flow. I thought we handle queries inside state store. What is `handleBasicQuery`? And why does the "query map" know about `KeyValueStore` (should it not be store type agnostic)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {

Review comment:
       Is this a valid condition? It seems if the store is not timestamped, there serde should never be a `ValueAndTimstamp` one?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());

Review comment:
       This should always return `false` (cf comment below).

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo,
+            final StateStore store
+        );
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =

Review comment:
       What is the difference between the map within `MeteredKeyValueStore` and this one? Why do we need both?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;

Review comment:
       nit: `typedQuery` -> `keyQuery` (or better `typedKeyQuery` to align to `rawKeyQuery``)

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo,
+            final StateStore store
+        );
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
+        mkMap(
+            mkEntry(
+                PingQuery.class,

Review comment:
       Maybe I should read the KIP, but was it a `PingQuery` (well, I can guess, but what's its purpose (why is it useful)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo,
+            final StateStore store
+        );
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
+        mkMap(
+            mkEntry(
+                PingQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true)
+            ),
+            mkEntry(KeyQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> {
+                    if (store instanceof KeyValueStore) {
+                        final KeyQuery<Bytes, byte[]> rawKeyQuery = (KeyQuery<Bytes, byte[]>) query;
+                        final KeyValueStore<Bytes, byte[]> keyValueStore =
+                            (KeyValueStore<Bytes, byte[]>) store;
+                        try {
+                            final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
+                            return QueryResult.forResult(bytes);
+                        } catch (final Throwable t) {

Review comment:
       nit: `t` -> `throwable` (of just `exception`?)
   
   btw: is it really best practice to catch `Throwable`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
##########
@@ -52,5 +52,12 @@
      * The requested store partition does not exist at all. For example, partition 4 was requested,
      * but the store in question only has 4 partitions (0 through 3).
      */
-    DOES_NOT_EXIST;
+    DOES_NOT_EXIST,
+
+    /**
+     * The store that handled the query got an exception during query execution. The message
+     * will contain the exception details. Depending on the nature of the exception, the caller
+     * may be able to retry this instance or may need to try a different instance.
+     */
+    STORE_EXCEPTION;

Review comment:
       I did not follow the details of the KIP discussion, but wondering to what extend this new `FailureReason` class voids https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+                final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
+                    (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer();
+                deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
+            } else {
+                deserializer = vSerde.deserializer();
+            }

Review comment:
       > so instead we just pass through the non-timestamped value from the bytes store and strip off the ValueAndTimestamp serde so we can deserialize the raw value.
   
   Not sure if I understand how we would do this? We don't control the inner stores?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;

Review comment:
       Seems to apply to `FailureReason`, too? There are not class JavaDocs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771746009



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -267,17 +281,41 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return result;
     }
 
+
     @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                           final PositionBound positionBound,
+                                           final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+            KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final Deserializer<V> deserializer = getValueDeserializer();
+            final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult());
+            final QueryResult<V> typedQueryResult =
+                rawResult.swapResult(value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
     private Deserializer<V> getValueDeserializer() {
-        final Serde<V> vSerde = serdes.valueSerde();
+        final Serde<V> valueSerde = serdes.valueSerde();
         final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
         final Deserializer<V> deserializer;
-        if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+        if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {

Review comment:
       I know it's weird, but it is correct. I would like to revisit it, but I think we really need to do that after the current round of queries are implemented.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771742317



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -162,15 +163,39 @@ public static boolean isPermitted(
             }
             final R result = (R) iterator;
             return QueryResult.forResult(result);
-        } catch (final Throwable t) {
-            final String message = parseStoreException(t, store, query);
+        } catch (final Exception e) {
+            final String message = parseStoreException(e, store, query);
             return QueryResult.forFailure(
                 FailureReason.STORE_EXCEPTION,
                 message
             );
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                                  final PositionBound positionBound,
+                                                  final boolean collectExecutionInfo,
+                                                  final StateStore store) {
+        if (store instanceof KeyValueStore) {
+            final KeyQuery<Bytes, byte[]> rawKeyQuery = (KeyQuery<Bytes, byte[]>) query;
+            final KeyValueStore<Bytes, byte[]> keyValueStore =
+                (KeyValueStore<Bytes, byte[]>) store;
+            try {
+                final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
+                return (QueryResult<R>) QueryResult.forResult(bytes);

Review comment:
       Thanks; let's keep that in mind as we tackle some of the API refactor tasks we've queued up. We started with the RawXQuery approach, then dropped it. Before we add it back, I think we'd better have a representative set of queries and also bear in mind all the other sharp edges we'd like to smooth over before release.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771454005



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {

Review comment:
       The purpose of this method is to allow `MeteredKeyValue` store to deserialize the result without wiping out the execution info or position that it got back from the bytes store. I missed that while reviewing your PR, so I went ahead and added a fix for it to this one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771749966



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {

Review comment:
       Sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771490884



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();

Review comment:
       Not a bad idea!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737277



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {

Review comment:
       > The MeteredStore's serde is always a ValueAndTimestamp serde regardless of whether the inner store is Timestamped or not. 
   
   Is it? (1) We also have `MeteredTimestampStore` (of course is extends `MeteredStore`) but it seems better to split the logic and move everything timestamp related into `MeteredTimestampStore`. (2) For PAPI users, they can add a plain `KeyValueStore` and we won't wrap it with the `TimestampedStore` face and the serdes won't be `ValueAndTimestamp` either.
   
   > What we do is, when you have a non-timestamped store, we wrap it with an extra layer (org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter) that pads the returned values with a fake timestamp (org.apache.kafka.streams.state.TimestampedBytesStore#convertToTimestampedFormat
   
   We only do this in the DSL, if the user gives as a non-timestamped store via `Materialized` -- but for PAPI users, we never do this but use whatever store is given to use as-is.
   
   > so we did not implement the same padding logic for non-timestamped data and instead just bubble up to the MeteredStore
   
   Not sure if I can follow? It should not be a concern for IQ? Also, the current conversion between plain/timestamped is really just a corner case (and a case that we want to deprecate anyway -- we just did not find a way to do so -- maybe we should add a runtime check at some point and WARN users if they provide a non-timestamped store until we remove support for it and throw an exception instead...). Seems not worth to add more tech debt for this behavior that we only added to not break stuff.
   
   > Which means that if we want to deserialize it, we need to know whether to use the ValueAndTimestamp deserializer or just the Value's deserializer.
   
   Yes, but we should split this logic between the plain `MeteredStore` and the `MeteredTimestampStore`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r767856835



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -79,6 +88,14 @@
     private StreamsMetricsImpl streamsMetrics;
     private TaskId taskId;
 
+    private  Map<Class, QueryHandler> queryHandlers =
+        mkMap(
+            mkEntry(
+                KeyQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> runKeyQuery(query, positionBound, collectExecutionInfo)
+            )
+        );
+

Review comment:
       Just trying to establish some pattern here that can let us dispatch these queries efficiently. This O(1) lookup should be faster than an O(n) if/else check or an O(log n) string switch statement, but we won't know for sure without benchmarking.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -216,9 +269,17 @@ public boolean global() {
 
         public abstract StoreSupplier<?> supplier();
 
+        public boolean timestamped() {
+            return true; // most stores are timestamped
+        };
+
         public boolean global() {
             return false;
         }
+
+        public boolean keyValue() {
+            return false;
+        }

Review comment:
       These help us adjust our expectations in the validations below, so that we can cover all store types in the same test.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -513,6 +590,43 @@ public void shouldHandlePingQuery() {
         assertThat(result.getPosition(), is(INPUT_POSITION));
     }
 
+    public <V> void shouldHandleKeyQuery(
+        final Integer key,
+        final Function<V, Integer> valueExtactor,
+        final Integer expectedValue) {
+
+        final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
+        final StateQueryRequest<V> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<V> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        final QueryResult<V> queryResult =
+            result.getGlobalResult() != null
+                ? result.getGlobalResult()
+                : result.getOnlyPartitionResult();
+        final boolean failure = queryResult.isFailure();
+        if (failure) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class,
+            queryResult::getFailureMessage);
+
+        final V result1 = queryResult.getResult();
+        final Integer integer = valueExtactor.apply(result1);
+        assertThat(integer, is(expectedValue));

Review comment:
       Here's where we run that function to either get the value out of the ValueAndTimestamp or just give back the value with the identity function.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
##########
@@ -52,5 +52,12 @@
      * The requested store partition does not exist at all. For example, partition 4 was requested,
      * but the store in question only has 4 partitions (0 through 3).
      */
-    DOES_NOT_EXIST;
+    DOES_NOT_EXIST,
+
+    /**
+     * The store that handled the query got an exception during query execution. The message
+     * will contain the exception details. Depending on the nature of the exception, the caller
+     * may be able to retry this instance or may need to try a different instance.
+     */
+    STORE_EXCEPTION;

Review comment:
       I realized in the implementation for RocksDB that we will need to account for runtime exceptions from the stores. I'll update the KIP.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -426,6 +487,22 @@ public void verifyStore() {
             shouldHandlePingQuery();
             shouldCollectExecutionInfo();
             shouldCollectExecutionInfoUnderFailure();
+
+            if (storeToTest.keyValue()) {
+                if (storeToTest.timestamped()) {
+                    shouldHandleKeyQuery(
+                        2,
+                        (Function<ValueAndTimestamp<Integer>, Integer>) ValueAndTimestamp::value,
+                        2
+                    );
+                } else {
+                    shouldHandleKeyQuery(
+                        2,
+                        Function.identity(),
+                        2
+                    );
+                }
+            }

Review comment:
       Here's where we use those properties. KeyQueries are only implemented for KeyValue stores. For Timestamped stores, we get back a ValueAndTimestamp, which we extract the value from before making the assertion. Otherwise, we just get back the value and can assert directly on it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+                final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
+                    (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer();
+                deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
+            } else {
+                deserializer = vSerde.deserializer();
+            }

Review comment:
       Note for the reviewers in case this is mysterious: We support one store, the non-timestamped RocksDB store (`Stores.persistentKeyValueStore`), which does not return a ValueAndTimestamp tuple, but just a raw value. But Streams has a store layer that inserts dummy timestamps to make everything conform to the same types. It's counter to the goals of IQv2 to spend time and memory copying the result arrays to add the timestamp only to strip it off again later (as in IQv1), so instead we just pass through the non-timestamped value from the bytes store and strip off the ValueAndTimestamp serde so we can deserialize the raw value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771512115



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());

Review comment:
       I'd forgotten about MeteredTimestampedKeyValueStore, but now that I'm looking at it, what it does is extend the MeteredKeyValueStore, apparently specifically to pad the value serde with a ValueAndTimestamp serde. Otherwise, all the logic lives in MeteredKeyValueStore.
   
   Also, because we always wrap the non-timestamped store with the `KeyValueToTimestampedKeyValueByteStoreAdapter`, we also always pass through the MeteredTimestampedKeyValue store whether the inner store is really timestamped or not.
   
   I think we could clean this whole hierarchy up a bit, but it's not necessary as part of this work.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771530728



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -42,16 +102,21 @@ private StoreQueryUtils() {
         final int partition
     ) {
 
-        final QueryResult<R> result;
         final long start = collectExecutionInfo ? System.nanoTime() : -1L;
-        if (query instanceof PingQuery) {
-            if (!isPermitted(position, positionBound, partition)) {
-                result = QueryResult.notUpToBound(position, positionBound, partition);
-            } else {
-                result = (QueryResult<R>) QueryResult.forResult(true);
-            }
-        } else {
+        final QueryResult<R> result;
+
+        final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());

Review comment:
       Yep, that's accurate, but many of the stores will have the exact same logic as each other, so it made sense to consolidate it, which is what this util class is for.
   
   The function in the query map just checks the type of the store so that it can either cast it to execute the query or return "unknown query". That way, we can use the same dispatch map for all queries.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771687546



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##########
@@ -34,7 +34,7 @@
  * <p>
  */
 @Evolving
-public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
+public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {

Review comment:
       I had a review comment to add this to KeyQuery, so I added it to RangeQuery for exactly the same reason.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java
##########
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.query.Query;
-
-/**
- * A very simple query that all stores can handle to verify that the store is participating in the
- * IQv2 framework properly.
- * <p>
- * This is not a public API and may change without notice.
- */
-public class PingQuery implements Query<Boolean> {

Review comment:
       Removed, since it was only for validating the framework in the absence of any query implementations, and now we have query implementations.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -54,22 +55,22 @@
         );
     }
 
-    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
+    @SuppressWarnings("rawtypes")
+    private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP =
         mkMap(
-            mkEntry(
-                PingQuery.class,
-                (query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true)
-            ),

Review comment:
       Removed see the comment on the PingQuery class.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -162,15 +163,39 @@ public static boolean isPermitted(
             }
             final R result = (R) iterator;
             return QueryResult.forResult(result);
-        } catch (final Throwable t) {
-            final String message = parseStoreException(t, store, query);
+        } catch (final Exception e) {

Review comment:
       Changed from Throwable to Exception to avoid swallowing Errors

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -527,10 +541,11 @@ public void verifyStore() {
     private void globalShouldRejectAllQueries() {
         // See KAFKA-13523
 
-        final PingQuery query = new PingQuery();
-        final StateQueryRequest<Boolean> request = inStore(STORE_NAME).withQuery(query);
+        final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);

Review comment:
       Also replaced the PingQuery here. It also doesn't affect the evaluation.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -630,29 +609,11 @@ public void shouldHandlePingQuery() {
                 .withQuery(query)
                 .withPartitions(mkSet(0, 1))
                 .withPositionBound(PositionBound.at(INPUT_POSITION));
-
         final StateQueryResult<KeyValueIterator<Integer, V>> result =
             IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
 
         if (result.getGlobalResult() != null) {
-            final QueryResult<KeyValueIterator<Integer, V>> queryResult = result.getGlobalResult();
-            final boolean failure = queryResult.isFailure();
-            if (failure) {
-                throw new AssertionError(queryResult.toString());
-            }
-            assertThat(queryResult.isSuccess(), is(true));
-
-            assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
-            assertThrows(IllegalArgumentException.class,
-                queryResult::getFailureMessage);
-
-            final KeyValueIterator<Integer, V> iterator = queryResult.getResult();
-            final Set<Integer> actualValue = new HashSet<>();
-            while (iterator.hasNext()) {
-                actualValue.add(valueExtactor.apply(iterator.next().value));
-            }
-            assertThat(actualValue, is(expectedValue));
-            assertThat(queryResult.getExecutionInfo(), is(empty()));
+            fail("global tables aren't implemented");

Review comment:
       This was covered in a prior PR. It will be fixed in https://issues.apache.org/jira/browse/KAFKA-13523

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
##########
@@ -178,26 +180,26 @@ public static void after() {
 
     @Test
     public void shouldFailUnknownStore() {
-        final PingQuery query = new PingQuery();
-        final StateQueryRequest<Boolean> request =
+        final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);

Review comment:
       The query itself doesn't matter for these evaluations, so I just arbitrarily swapped KeyQuery in for PingQuery

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##########
@@ -52,7 +52,6 @@ private RangeQuery(final Optional<K> lower, final Optional<K> upper) {
      * @param upper The key that specifies the upper bound of the range
      * @param <K> The key type
      * @param <V> The value type
-     * @return An iterator of records

Review comment:
       Noticed these, which are not correct. This method returns a query, not an iterator of records.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -209,42 +215,45 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
                                     final PositionBound positionBound,
                                     final boolean collectExecutionInfo) {
 
-        final long start = System.nanoTime();
+        final long start = time.nanoseconds();
         final QueryResult<R> result;
 
         final QueryHandler handler = queryHandlers.get(query.getClass());
         if (handler == null) {
             result = wrapped().query(query, positionBound, collectExecutionInfo);
             if (collectExecutionInfo) {
                 result.addExecutionInfo(
-                        "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
             }
         } else {
             result = (QueryResult<R>) handler.apply(
-                    query,
-                    positionBound,
-                    collectExecutionInfo,
-                    this
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this

Review comment:
       Sorry about this. The last PR contained some bad formatting, so I'm just biting the bullet and fixing it here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771515988



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo,
+            final StateStore store
+        );
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =

Review comment:
       They both exist to dispatch query execution logic. The MeteredStores' logic is to translate results from the inner stores, and the inner stores' logic is to execute the query. Since we have a lot of functionally identical stores (i.e., many KeyValue stores, etc.), it made sense to consolidate their execution logic here instead of duplicating it in every store class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771736938



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;
+        } else {
+            final QueryResult<V> result = new QueryResult<>(value);

Review comment:
       Thanks, @mjsax , I'm not sure precisely what you mean. This does create a new object. If you think it would be clearer to add a constructor allowing people to set the result along with a pre-populated executionInfo and position instead, we could, but this is the API we agreed on in the KIP.
   
   I want this new API to have good ergonomics, so I do want to consider these, but I don't think we need to hold up the KeyQuery PR on it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771735849



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;
+        } else {
+            final QueryResult<V> result = new QueryResult<>(value);

Review comment:
       Thanks, @guozhangwang , I think something like that will be the outcome of this follow-on work: https://issues.apache.org/jira/browse/KAFKA-13526
   
   We'll tackle that question before the first release of this new API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771749813



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;
+        } else {
+            final QueryResult<V> result = new QueryResult<>(value);

Review comment:
       I mean, if the upper layers need `QueryResult<R>` with proper type, and the inner layers need `QueryResult<bytes[]>`, we should just have two properly types object, and instead of "swapping", just take the `byte[]` from `QueryResult<bytes[]>`, deserialize them, and stuff the result into the `QueryResult<R>` object.
   
   > but this is the API we agreed on in the KIP.
   
   I did not read the KIP :D (maybe I should have). And we can always adjust it. So me it seems useless to have a generic type parameter if we don't obey it anyway, and use casts. It's the purpose of generics to avoid casts, and if it does not avoid casts, it seems pointless to have).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #11582:
URL: https://github.com/apache/kafka/pull/11582


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org