You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/03 11:11:49 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

cadonna commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r761760939



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,110 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =

Review comment:
       nit: Could you give a more meaningful name to this variable?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Position.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A representation of a position vector with respect to a set of topic partitions. For example, in
+ * Interactive Query ({@link org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a
+ * query result may contain information from multiple store partitions, each of which contains
+ * information from multiple input topics' partitions. This class can be used to summarize all of
+ * that positional information.
+ * <p>
+ * This class is threadsafe, although it is mutable. Readers are recommended to use {@link
+ * Position#copy()} to avoid seeing mutations to the Position after they get the reference. For
+ * examples, when a store executes a {@link org.apache.kafka.streams.processor.StateStore#query(Query,
+ * PositionBound, boolean)} request and returns its current position via {@link
+ * QueryResult#setPosition(Position)}, it should pass a copy of its position instead of the mutable
+ * reference.
+ */
+@Evolving
+public class Position {
+
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position;
+
+    private Position(final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position) {
+        this.position = position;
+    }
+
+    /**
+     * Create a new, empty Position.
+     */
+    public static Position emptyPosition() {
+        return new Position(new ConcurrentHashMap<>());
+    }
+
+    /**
+     * Create a new Position and populate it with a mapping of topic -> partition -> offset.
+     * <p>
+     * Note, the resulting Position does not share any structure with the provided map, so
+     * subsequent changes to the map or Position will not affect the other.
+     */
+    public static Position fromMap(final Map<String, ? extends Map<Integer, Long>> map) {
+        return new Position(deepCopy(map));
+    }
+
+    /**
+     * Augment an existing Position by setting a new offset for a topic and partition.
+     * <p>
+     * Note: enforces monotonicity on offsets. I.e., if there is already a component for the same
+     * topic and partition with a larger offset, the update will succeed but not overwrite the
+     * offset.
+     * <p>
+     * Returns a self-reference for chained calls. Note: this method mutates the Position.
+     */
+    public Position withComponent(final String topic, final int partition, final long offset) {
+        position
+            .computeIfAbsent(topic, k -> new ConcurrentHashMap<>())
+            .compute(
+                partition,
+                (integer, prior) -> prior == null || offset > prior ? offset : prior
+            );
+        return this;
+    }
+
+    /**
+     * Create a deep copy of the Position.
+     */
+    public Position copy() {
+        return new Position(deepCopy(position));
+    }
+
+    /**
+     * Create a new, structurally independent Position that is the result of merging two other
+     * Positions.
+     * <p>
+     * If both Positions contain the same topic -> partition -> offset mapping, the resulting
+     * Position will contain a mapping with the larger of the two offsets.
+     */
+    public Position merge(final Position other) {
+        if (other == null) {
+            return this;

Review comment:
       It is a bit unexpected that the result is the instance itself. I would have expected a new instance. Anyways, could you document this special case in the javadocs?  

##########
File path: checkstyle/suppressions.xml
##########
@@ -164,7 +164,7 @@
               files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>
+              files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>

Review comment:
       Is this really needed?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -119,4 +124,32 @@ default void init(final StateStoreContext context, final StateStore root) {
      * @return {@code true} if the store is open
      */
     boolean isOpen();
+
+    /**
+     * Execute a query. Returns a QueryResult containing either result data or
+     * a failure.
+     * <p>
+     * If the store doesn't know how to handle the given query, the result
+     * shall be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
+     * If the store couldn't satisfy the given position bound, the result
+     * shall be a {@link FailureReason#NOT_UP_TO_BOUND}.
+     * <p>
+     * Note to store implementers: if your store does not support position tracking,
+     * you can correctly respond {@link FailureReason#NOT_UP_TO_BOUND} if the argument is
+     * anything but {@link PositionBound#unbounded()}. Be sure to explain in the failure message
+     * that bounded positions are not supported.
+     * <p>
+     * @param query The query to execute
+     * @param positionBound The position the store must be at or past
+     * @param collectExecutionInfo Whether the store should collect detailed execution info for the query
+     * @param <R> The result type
+     */
+    @Evolving
+    default <R> QueryResult<R> query(
+        Query<R> query,
+        PositionBound positionBound,
+        boolean collectExecutionInfo) {
+        // If a store doesn't implement a query handler, then all queries are unknown.

Review comment:
       Could you add this to the javadocs since it seems to be an important information for the outside world?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class QueryResult<R> {
+
+    private final List<String> executionInfo = new LinkedList<>();
+    private final FailureReason failureReason;
+    private final String failure;
+    private final R result;
+    private Position position;
+
+    private QueryResult(final R result) {
+        this.result = result;
+        this.failureReason = null;
+        this.failure = null;
+    }
+
+    private QueryResult(final FailureReason failureReason, final String failure) {
+        this.result = null;
+        this.failureReason = failureReason;
+        this.failure = failure;
+    }
+
+    /**
+     * Static factory method to create a result object for a successful query. Used by StateStores
+     * to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forResult(final R result) {
+        return new QueryResult<>(result);
+    }
+
+    /**
+     * Static factory method to create a result object for a failed query. Used by StateStores to
+     * respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forFailure(
+        final FailureReason failureReason,
+        final String failureMessage) {
+
+        return new QueryResult<>(failureReason, failureMessage);
+    }
+
+    /**
+     * Static factory method to create a failed query result object to indicate that the store does
+     * not know how to handle the query.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forUnknownQueryType(
+        final Query<R> query,
+        final StateStore store) {
+
+        return new QueryResult<>(
+            FailureReason.UNKNOWN_QUERY_TYPE,
+            "This store (" + store.getClass() + ") doesn't know how to execute "
+                + "the given query (" + query + ")." +
+                " Contact the store maintainer if you need support for a new query type.");
+    }
+
+    /**
+     * Static factory method to create a failed query result object to indicate that the store has
+     * not yet caught up to the requested position bound.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> notUpToBound(
+        final Position currentPosition,
+        final PositionBound positionBound,
+        final int partition) {
+
+        return new QueryResult<>(
+            FailureReason.NOT_UP_TO_BOUND,
+            "For store partition " + partition + ", the current position "
+                + currentPosition + " is not yet up to the bound "
+                + positionBound
+        );
+    }
+
+    /**
+     * Used by stores to add detailed execution information (if requested) during query execution.
+     */
+    public void addExecutionInfo(final String message) {
+        executionInfo.add(message);
+    }
+
+    /**
+     * Used by stores to report what exact position in the store's history it was at when it
+     * executed the query.
+     */
+    public void setPosition(final Position position) {
+        this.position = position;
+    }
+
+    /**
+     * True iff the query was successfully executed. The response is available in {@link
+     * this#getResult()}.
+     */
+    public boolean isSuccess() {
+        return failureReason == null;
+    }
+
+
+    /**
+     * True iff the query execution failed. More information about the failure is available in
+     * {@link this#getFailureReason()} and {@link this#getFailureMessage()}.
+     */
+    public boolean isFailure() {
+        return failureReason != null;
+    }
+
+    /**
+     * If detailed execution information was requested in {@link StateQueryRequest#enableExecutionInfo()},
+     * this method returned the execution details for this partition's result.
+     */
+    public List<String> getExecutionInfo() {
+        return executionInfo;
+    }
+
+    /**
+     * This state partition's exact position in its history when this query was executed. Can be
+     * used in conjunction with subsequent queries via {@link StateQueryRequest#withPositionBound(PositionBound)}.
+     * <p>
+     * Note: stores are encouraged, but not required to set this property.
+     */
+    public Position getPosition() {
+        return position;
+    }
+
+    /**
+     * If this partition failed to execute the query, returns the reason.
+     *
+     * @throws IllegalArgumentException if this is not a failed result.
+     */
+    public FailureReason getFailureReason() {
+        if (!isFailure()) {
+            throw new IllegalArgumentException(

Review comment:
       See my comment above regarding `IllegalArgumentException` vs. `IllegalStateException`.  There are also other occurrences in this class.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The response object for interactive queries. This wraps the individual partition results, as well
+ * as metadata relating to the result as a whole.
+ * <p>
+ *
+ * @param <R> The type of the query result.
+ */
+@Evolving
+public class StateQueryResult<R> {
+
+    private final Map<Integer, QueryResult<R>> partitionResults = new HashMap<>();
+    private QueryResult<R> globalResult = null;
+
+    /**
+     * Set the result for a global store query. Used by Kafka Streams and available for tests.
+     */
+    public void setGlobalResult(final QueryResult<R> r) {
+        this.globalResult = r;
+    }
+
+    /**
+     * Set the result for a partitioned store query. Used by Kafka Streams and available for tests.
+     */
+    public void addResult(final int partition, final QueryResult<R> r) {
+        partitionResults.put(partition, r);
+    }
+
+
+    /**
+     * The query's result for each partition that executed the query. Empty for global store
+     * queries.
+     */
+    public Map<Integer, QueryResult<R>> getPartitionResults() {
+        return partitionResults;
+    }
+
+    /**
+     * For queries that are expected to match records in only one partition, returns the result.
+     *
+     * @throws IllegalArgumentException if the results are not for exactly one partition.
+     */
+    public QueryResult<R> getOnlyPartitionResult() {
+        final List<QueryResult<R>> nonempty =
+            partitionResults
+                .values()
+                .stream()
+                .filter(r -> r.getResult() != null)
+                .collect(Collectors.toList());
+
+        if (nonempty.size() != 1) {
+            throw new IllegalArgumentException(
+                "The query did not return exactly one partition result: " + partitionResults
+            );
+        } else {
+            return nonempty.get(0);
+        }
+    }
+
+    /**
+     * The query's result for global store queries. Is {@code null} for non-global (partitioned)
+     * store queries.
+     */
+    public QueryResult<R> getGlobalResult() {

Review comment:
       I could not find this method in the KIP. Could you update the KIP?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class QueryResult<R> {

Review comment:
       Shouldn't this also be `@Evolving`?

##########
File path: streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+
+public class PositionTest {
+
+    @Test
+    public void shouldCreateFromMap() {
+        final Map<String, Map<Integer, Long>> map = mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        );
+
+        final Position position = Position.fromMap(map);
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+
+        // Should be a copy of the constructor map
+
+        map.get("topic1").put(99, 99L);
+
+        // so the position is still the original one
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+    }
+
+    @Test
+    public void shouldCreateFromNullMap() {
+        final Position position = Position.fromMap(null);
+        assertThat(position.getTopics(), equalTo(Collections.emptySet()));
+    }
+
+    @Test
+    public void shouldMerge() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position position1 = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 7L))), // update offset
+            mkEntry("topic1", mkMap(mkEntry(8, 1L))), // add partition
+            mkEntry("topic2", mkMap(mkEntry(9, 5L))) // add topic
+        ));
+
+        final Position merged = position.merge(position1);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 7L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(merged.getBound("topic2"), equalTo(mkMap(mkEntry(9, 5L))));
+    }
+
+    @Test
+    public void shouldUpdateComponentMonotonically() {
+        final Position position = Position.emptyPosition();
+        position.withComponent("topic", 3, 5L);
+        position.withComponent("topic", 3, 4L);
+        assertThat(position.getBound("topic").get(3), equalTo(5L));
+        position.withComponent("topic", 3, 6L);
+        assertThat(position.getBound("topic").get(3), equalTo(6L));
+    }
+
+    @Test
+    public void shouldCopy() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position copy = position.copy();
+
+        // mutate original
+        position.withComponent("topic", 0, 6L);
+        position.withComponent("topic1", 8, 1L);
+        position.withComponent("topic2", 2, 4L);
+
+        // copy has not changed
+        assertThat(copy.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(copy.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(copy.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+
+        // original has changed
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 6L))));
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(position.getBound("topic2"), equalTo(mkMap(mkEntry(2, 4L))));
+    }
+
+    @Test
+    public void shouldMergeNull() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position merged = position.merge(null);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));

Review comment:
       Could you please verify whether the result is the same instance or not? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,110 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            final Set<Integer> handledPartitions = new HashSet<>();
+
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.forFailure(
+                                        FailureReason.NOT_ACTIVE,
+                                        "Query requires a running active task,"
+                                            + " but partition was in state "
+                                            + state + " and was "
+                                            + (active ? "active" : "not active") + "."
+                                    )
+                                );
+                            } else {
+                                final QueryResult<R> r = store.query(
+                                    request.getQuery(),
+                                    request.isRequireActive()
+                                        ? PositionBound.unbounded()
+                                        : request.getPositionBound(),
+                                    request.executionInfoEnabled()
+                                );
+                                result.addResult(partition, r);
+                            }
+                        }
+
+                        // optimization: if we have handled all the requested partitions,
+                        // we can return right away.
+                        handledPartitions.add(partition);
+                        if (!request.isAllPartitions()
+                            && handledPartitions.containsAll(request.getPartitions())) {
+                            return result;
+                        }
+                    }
+                }
+            }
+        }
+
+        return result;
+    }

Review comment:
       In general, the method is quite long. Could you try to extract methods with meaningful names? For example, the checks in the beginning to verify whether the state is known or the stream thread is running can be extracted to methods.

##########
File path: streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class PositionBoundTest {
+
+    @Test
+    public void shouldCopyPosition() {
+        final Position position = Position.emptyPosition();
+        final PositionBound positionBound = PositionBound.at(position);
+        position.withComponent("topic", 1, 2L);
+
+        assertThat(position.getTopics(), equalTo(mkSet("topic")));
+        assertThat(positionBound.position().getTopics(), empty());
+    }
+
+    @Test
+    public void unboundedShouldBeUnbounded() {
+        final PositionBound bound = PositionBound.unbounded();
+        assertTrue(bound.isUnbounded());
+    }
+
+    @Test
+    public void unboundedShouldThrowOnPosition() {
+        final PositionBound bound = PositionBound.unbounded();
+        assertThrows(IllegalArgumentException.class, bound::position);
+    }
+
+    @Test
+    public void shouldEqualPosition() {
+        final PositionBound bound1 = PositionBound.at(Position.emptyPosition());
+        final PositionBound bound2 = PositionBound.at(Position.emptyPosition());
+        assertEquals(bound1, bound2);
+    }
+
+    @Test
+    public void shouldEqualUnbounded() {
+        final PositionBound bound1 = PositionBound.unbounded();
+        final PositionBound bound2 = PositionBound.unbounded();
+        assertEquals(bound1, bound2);
+    }
+
+    @Test
+    public void shouldEqualSelf() {
+        final PositionBound bound1 = PositionBound.at(Position.emptyPosition());
+        assertEquals(bound1, bound1);
+    }
+
+    @Test
+    public void shouldNotEqualNull() {
+        final PositionBound bound1 = PositionBound.at(Position.emptyPosition());
+        assertNotEquals(bound1, null);
+    }
+

Review comment:
       Could you add a test for when the unbounded flag differs and one for when the position differs? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Objects;
+
+/**
+ * A class bounding the processing state {@link Position} during queries. This can be used to
+ * specify that a query should fail if the locally available partition isn't caught up to the
+ * specified bound. "Unbounded" places no restrictions on the current location of the partition.
+ */
+@Evolving
+public class PositionBound {
+
+    private final Position position;
+    private final boolean unbounded;
+
+    private PositionBound(final Position position, final boolean unbounded) {
+        if (unbounded && position != null) {
+            throw new IllegalArgumentException();
+        } else if (position != null) {
+            this.position = position.copy();
+            this.unbounded = false;
+        } else {
+            this.position = null;
+            this.unbounded = unbounded;
+        }
+    }
+
+    /**
+     * Creates a new PositionBound representing "no bound"
+     */
+    public static PositionBound unbounded() {
+        return new PositionBound(null, true);
+    }
+
+    /**
+     * Creates a new PositionBound representing a specific position.
+     */
+    public static PositionBound at(final Position position) {
+        return new PositionBound(position, false);
+    }
+
+    /**
+     * Returns true iff this object specifies that there is no position bound.
+     */
+    public boolean isUnbounded() {
+        return unbounded;
+    }
+
+    /**
+     * Returns the specific position of this bound.
+     *
+     * @throws IllegalArgumentException if this is an "unbounded" position.
+     */
+    public Position position() {
+        if (unbounded) {
+            throw new IllegalArgumentException(

Review comment:
       Shouldn't this be an `IllegalStateException`? No arguments are passed to the method. The javadocs for `IllegalStateException`: 
   
   > Signals that a method has been invoked at an illegal or inappropriate time.
   
   which seems to be the case here. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Position.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A representation of a position vector with respect to a set of topic partitions. For example, in
+ * Interactive Query ({@link org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a
+ * query result may contain information from multiple store partitions, each of which contains
+ * information from multiple input topics' partitions. This class can be used to summarize all of
+ * that positional information.
+ * <p>
+ * This class is threadsafe, although it is mutable. Readers are recommended to use {@link
+ * Position#copy()} to avoid seeing mutations to the Position after they get the reference. For
+ * examples, when a store executes a {@link org.apache.kafka.streams.processor.StateStore#query(Query,
+ * PositionBound, boolean)} request and returns its current position via {@link
+ * QueryResult#setPosition(Position)}, it should pass a copy of its position instead of the mutable
+ * reference.
+ */
+@Evolving
+public class Position {

Review comment:
       `Position` is specified as an interface in the KIP. Could you update the KIP and inform the discuss thread?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class QueryResult<R> {
+
+    private final List<String> executionInfo = new LinkedList<>();
+    private final FailureReason failureReason;
+    private final String failure;
+    private final R result;
+    private Position position;
+
+    private QueryResult(final R result) {
+        this.result = result;
+        this.failureReason = null;
+        this.failure = null;
+    }
+
+    private QueryResult(final FailureReason failureReason, final String failure) {
+        this.result = null;
+        this.failureReason = failureReason;
+        this.failure = failure;
+    }
+
+    /**
+     * Static factory method to create a result object for a successful query. Used by StateStores
+     * to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forResult(final R result) {
+        return new QueryResult<>(result);
+    }
+
+    /**
+     * Static factory method to create a result object for a failed query. Used by StateStores to
+     * respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forFailure(
+        final FailureReason failureReason,
+        final String failureMessage) {
+
+        return new QueryResult<>(failureReason, failureMessage);
+    }
+
+    /**
+     * Static factory method to create a failed query result object to indicate that the store does
+     * not know how to handle the query.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forUnknownQueryType(
+        final Query<R> query,
+        final StateStore store) {
+
+        return new QueryResult<>(
+            FailureReason.UNKNOWN_QUERY_TYPE,
+            "This store (" + store.getClass() + ") doesn't know how to execute "
+                + "the given query (" + query + ")." +
+                " Contact the store maintainer if you need support for a new query type.");
+    }
+
+    /**
+     * Static factory method to create a failed query result object to indicate that the store has
+     * not yet caught up to the requested position bound.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> notUpToBound(
+        final Position currentPosition,
+        final PositionBound positionBound,
+        final int partition) {
+
+        return new QueryResult<>(
+            FailureReason.NOT_UP_TO_BOUND,
+            "For store partition " + partition + ", the current position "
+                + currentPosition + " is not yet up to the bound "
+                + positionBound
+        );
+    }
+
+    /**
+     * Used by stores to add detailed execution information (if requested) during query execution.
+     */
+    public void addExecutionInfo(final String message) {
+        executionInfo.add(message);
+    }
+
+    /**
+     * Used by stores to report what exact position in the store's history it was at when it
+     * executed the query.
+     */
+    public void setPosition(final Position position) {
+        this.position = position;
+    }
+
+    /**
+     * True iff the query was successfully executed. The response is available in {@link
+     * this#getResult()}.
+     */
+    public boolean isSuccess() {
+        return failureReason == null;
+    }
+
+
+    /**
+     * True iff the query execution failed. More information about the failure is available in
+     * {@link this#getFailureReason()} and {@link this#getFailureMessage()}.
+     */
+    public boolean isFailure() {
+        return failureReason != null;
+    }
+
+    /**
+     * If detailed execution information was requested in {@link StateQueryRequest#enableExecutionInfo()},
+     * this method returned the execution details for this partition's result.
+     */
+    public List<String> getExecutionInfo() {
+        return executionInfo;
+    }
+
+    /**
+     * This state partition's exact position in its history when this query was executed. Can be
+     * used in conjunction with subsequent queries via {@link StateQueryRequest#withPositionBound(PositionBound)}.
+     * <p>
+     * Note: stores are encouraged, but not required to set this property.
+     */
+    public Position getPosition() {
+        return position;
+    }
+
+    /**
+     * If this partition failed to execute the query, returns the reason.
+     *
+     * @throws IllegalArgumentException if this is not a failed result.
+     */
+    public FailureReason getFailureReason() {
+        if (!isFailure()) {
+            throw new IllegalArgumentException(
+                "Cannot get failure reason because this query did not fail."
+            );
+        }
+        return failureReason;
+    }
+
+    /**
+     * If this partition failed to execute the query, returns the failure message.
+     *
+     * @throws IllegalArgumentException if this is not a failed result.
+     */
+    public String getFailureMessage() {
+        if (!isFailure()) {
+            throw new IllegalArgumentException(
+                "Cannot get failure message because this query did not fail."
+            );
+        }
+        return failure;
+    }
+
+    /**
+     * Returns the result of executing the query on one partition. The result type is determined by
+     * the query. Note: queries may choose to return {@code null} for a successful query, so {@link
+     * this#isSuccess()} and {@link this#isFailure()} must be used to determine whether the query
+     * was successful of failed on this partition.
+     *
+     * @throws IllegalArgumentException if this is not a successful query.
+     */
+    public R getResult() {
+        if (!isSuccess()) {
+            throw new IllegalArgumentException(
+                "Cannot get result for failed query. Failure is " + failureReason.name() + ": "
+                    + failure);
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "QueryResult{" +
+            "executionInfo=" + executionInfo +
+            ", failureReason=" + failureReason +
+            ", failure='" + failure + '\'' +
+            ", result=" + result +
+            ", position=" + position +
+            '}';
+    }
+}

Review comment:
       I could not find `swapResult()` as described in the KIP.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Query.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+/**
+ * Marker interface that all interactive queries must implement (see {@link
+ * org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}).
+ * <p>
+ * You can find all available queries by searching for classes implementing this interface.
+ * <p>
+ * Kafka Streams will pass unknown query types straight through into the bytes stores, so callers
+ * can add custom queries by implementing this interface and providing custom stores that handle
+ * them (via {@link org.apache.kafka.streams.state.StoreSupplier}s.
+ * <p>
+ * See KIP-796 (https://cwiki.apache.org/confluence/x/34xnCw) for more details.
+ *
+ * @param <R> The type of the result returned by this query.
+ */
+public interface Query<R> {

Review comment:
       Shouldn't this also be `@Evolving`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Objects;
+
+/**
+ * A class bounding the processing state {@link Position} during queries. This can be used to
+ * specify that a query should fail if the locally available partition isn't caught up to the
+ * specified bound. "Unbounded" places no restrictions on the current location of the partition.
+ */
+@Evolving
+public class PositionBound {

Review comment:
       `PositionBound` is specified as an interface in the KIP. Could you update the KIP and inform the discuss thread?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The response object for interactive queries. This wraps the individual partition results, as well
+ * as metadata relating to the result as a whole.
+ * <p>
+ *
+ * @param <R> The type of the query result.
+ */
+@Evolving
+public class StateQueryResult<R> {
+
+    private final Map<Integer, QueryResult<R>> partitionResults = new HashMap<>();
+    private QueryResult<R> globalResult = null;
+
+    /**
+     * Set the result for a global store query. Used by Kafka Streams and available for tests.
+     */
+    public void setGlobalResult(final QueryResult<R> r) {
+        this.globalResult = r;
+    }
+
+    /**
+     * Set the result for a partitioned store query. Used by Kafka Streams and available for tests.
+     */
+    public void addResult(final int partition, final QueryResult<R> r) {

Review comment:
       I could not find this method in the KIP. Could you update the KIP?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,110 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            final Set<Integer> handledPartitions = new HashSet<>();
+
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.forFailure(
+                                        FailureReason.NOT_ACTIVE,
+                                        "Query requires a running active task,"
+                                            + " but partition was in state "
+                                            + state + " and was "
+                                            + (active ? "active" : "not active") + "."
+                                    )
+                                );
+                            } else {
+                                final QueryResult<R> r = store.query(
+                                    request.getQuery(),
+                                    request.isRequireActive()
+                                        ? PositionBound.unbounded()
+                                        : request.getPositionBound(),
+                                    request.executionInfoEnabled()
+                                );
+                                result.addResult(partition, r);
+                            }
+                        }
+
+                        // optimization: if we have handled all the requested partitions,
+                        // we can return right away.
+                        handledPartitions.add(partition);
+                        if (!request.isAllPartitions()
+                            && handledPartitions.containsAll(request.getPartitions())) {
+                            return result;
+                        }

Review comment:
       nit: Instead of putting a comment, could you extract the condition and give it a meaningful name?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class QueryResult<R> {

Review comment:
       Could you add unit tests for this class?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The response object for interactive queries. This wraps the individual partition results, as well
+ * as metadata relating to the result as a whole.
+ * <p>
+ *
+ * @param <R> The type of the query result.
+ */
+@Evolving
+public class StateQueryResult<R> {
+
+    private final Map<Integer, QueryResult<R>> partitionResults = new HashMap<>();
+    private QueryResult<R> globalResult = null;
+
+    /**
+     * Set the result for a global store query. Used by Kafka Streams and available for tests.
+     */
+    public void setGlobalResult(final QueryResult<R> r) {

Review comment:
       I could not find this method in the KIP. Could you update the KIP?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+
+public final class StoreQueryUtils {

Review comment:
       Could you add unit tests for this class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org