You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/15 23:00:15 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r770076908



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;

Review comment:
       Why do we know allow to swap the result if the current result has a failure? And if we don't want to allow swapping, why just return `this` but not throw an exception?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {

Review comment:
       Does it make sense to add execution info for this case? We pushed the query down, but did not handle it and thus the inner store would track (and we should not track)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");

Review comment:
       nit: `in` -> `within`
   (also: maybe add `.` at the end?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+                final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
+                    (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer();
+                deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
+            } else {
+                deserializer = vSerde.deserializer();
+            }

Review comment:
       > But Streams has a store layer that inserts dummy timestamps to make everything conform to the same types.
   
   We only do this for the DSL. For the PAPI, the "plain KeyValueStore" can still be used as-is, and we don't modify anything about it. Thus IQ must be able to handle both cases.
   
   Also, if we add this face, we would use a `MeteredTimestampedKeyValueStore` not this class. I think we need to move the logic for the timestamped case into the other class.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {

Review comment:
       nit: indenting (read like 2 parameters, but has 3)

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;
+        } else {
+            final QueryResult<V> result = new QueryResult<>(value);

Review comment:
       The generic type of `QueryResult` is `R` -- it does not seem safe to allow swapping in a `V` result?
   
   If we need to allow for different return types, why does `QueryType` has a generic parameter to begin with? Seems useless if we cannot rely on it?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {

Review comment:
       nit indention
   
   very hard to read

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");

Review comment:
       Use `Time` no` System`

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -29,10 +29,10 @@
  */
 public final class QueryResult<R> {
 
-    private final List<String> executionInfo = new LinkedList<>();
     private final FailureReason failureReason;
     private final String failure;
     private final R result;
+    private List<String> executionInfo = new LinkedList<>();

Review comment:
       Why not keep it `final` and use `clear()` and `addAll` instead?
   
   For my own education: what is `executionInfo` (and why is it a `String` list)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();

Review comment:
       We should use `Time`

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();

Review comment:
       nit `vSerde` -> `valueSerde`

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+@Evolving
+public class KeyQuery<K, V> implements Query<V> {

Review comment:
       Should this class be `final`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface

Review comment:
       We have a lot of functional interfaces but I don't think we annotated it anywhere. Also wondering what we gain by doing it? The Java compiler should be smart enough and not need this annotation? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -42,16 +102,21 @@ private StoreQueryUtils() {
         final int partition
     ) {
 
-        final QueryResult<R> result;
         final long start = collectExecutionInfo ? System.nanoTime() : -1L;
-        if (query instanceof PingQuery) {
-            if (!isPermitted(position, positionBound, partition)) {
-                result = QueryResult.notUpToBound(position, positionBound, partition);
-            } else {
-                result = (QueryResult<R>) QueryResult.forResult(true);
-            }
-        } else {
+        final QueryResult<R> result;
+
+        final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());

Review comment:
       Seems I don't understand the control flow. I thought we handle queries inside state store. What is `handleBasicQuery`? And why does the "query map" know about `KeyValueStore` (should it not be store type agnostic)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {

Review comment:
       Is this a valid condition? It seems if the store is not timestamped, there serde should never be a `ValueAndTimstamp` one?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());

Review comment:
       This should always return `false` (cf comment below).

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo,
+            final StateStore store
+        );
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =

Review comment:
       What is the difference between the map within `MeteredKeyValueStore` and this one? Why do we need both?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;

Review comment:
       nit: `typedQuery` -> `keyQuery` (or better `typedKeyQuery` to align to `rawKeyQuery``)

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo,
+            final StateStore store
+        );
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
+        mkMap(
+            mkEntry(
+                PingQuery.class,

Review comment:
       Maybe I should read the KIP, but was it a `PingQuery` (well, I can guess, but what's its purpose (why is it useful)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo,
+            final StateStore store
+        );
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
+        mkMap(
+            mkEntry(
+                PingQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true)
+            ),
+            mkEntry(KeyQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> {
+                    if (store instanceof KeyValueStore) {
+                        final KeyQuery<Bytes, byte[]> rawKeyQuery = (KeyQuery<Bytes, byte[]>) query;
+                        final KeyValueStore<Bytes, byte[]> keyValueStore =
+                            (KeyValueStore<Bytes, byte[]>) store;
+                        try {
+                            final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
+                            return QueryResult.forResult(bytes);
+                        } catch (final Throwable t) {

Review comment:
       nit: `t` -> `throwable` (of just `exception`?)
   
   btw: is it really best practice to catch `Throwable`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
##########
@@ -52,5 +52,12 @@
      * The requested store partition does not exist at all. For example, partition 4 was requested,
      * but the store in question only has 4 partitions (0 through 3).
      */
-    DOES_NOT_EXIST;
+    DOES_NOT_EXIST,
+
+    /**
+     * The store that handled the query got an exception during query execution. The message
+     * will contain the exception details. Depending on the nature of the exception, the caller
+     * may be able to retry this instance or may need to try a different instance.
+     */
+    STORE_EXCEPTION;

Review comment:
       I did not follow the details of the KIP discussion, but wondering to what extend this new `FailureReason` class voids https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+                final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
+                    (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer();
+                deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
+            } else {
+                deserializer = vSerde.deserializer();
+            }

Review comment:
       > so instead we just pass through the non-timestamped value from the bytes store and strip off the ValueAndTimestamp serde so we can deserialize the raw value.
   
   Not sure if I understand how we would do this? We don't control the inner stores?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;

Review comment:
       Seems to apply to `FailureReason`, too? There are not class JavaDocs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org