You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/30 22:23:34 UTC

[GitHub] [kafka] vvcephei opened a new pull request #11557: KAFKA-13491: IQv2 framework

vvcephei opened a new pull request #11557:
URL: https://github.com/apache/kafka/pull/11557


   Implements a new interactive query framework,
   as described in KIP-805 (https://cwiki.apache.org/confluence/x/85OqCw).
   
   No public queries are added in this PR, just the framework and tests.
   
   Also, position tracking and bounding is not implemented in this PR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762148745



##########
File path: streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+
+public class PositionTest {
+
+    @Test
+    public void shouldCreateFromMap() {
+        final Map<String, Map<Integer, Long>> map = mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        );
+
+        final Position position = Position.fromMap(map);
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+
+        // Should be a copy of the constructor map
+
+        map.get("topic1").put(99, 99L);
+
+        // so the position is still the original one
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+    }
+
+    @Test
+    public void shouldCreateFromNullMap() {
+        final Position position = Position.fromMap(null);
+        assertThat(position.getTopics(), equalTo(Collections.emptySet()));
+    }
+
+    @Test
+    public void shouldMerge() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position position1 = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 7L))), // update offset
+            mkEntry("topic1", mkMap(mkEntry(8, 1L))), // add partition
+            mkEntry("topic2", mkMap(mkEntry(9, 5L))) // add topic
+        ));
+
+        final Position merged = position.merge(position1);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 7L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(merged.getBound("topic2"), equalTo(mkMap(mkEntry(9, 5L))));
+    }
+
+    @Test
+    public void shouldUpdateComponentMonotonically() {
+        final Position position = Position.emptyPosition();
+        position.withComponent("topic", 3, 5L);
+        position.withComponent("topic", 3, 4L);
+        assertThat(position.getBound("topic").get(3), equalTo(5L));
+        position.withComponent("topic", 3, 6L);
+        assertThat(position.getBound("topic").get(3), equalTo(6L));
+    }
+
+    @Test
+    public void shouldCopy() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position copy = position.copy();
+
+        // mutate original
+        position.withComponent("topic", 0, 6L);
+        position.withComponent("topic1", 8, 1L);
+        position.withComponent("topic2", 2, 4L);
+
+        // copy has not changed
+        assertThat(copy.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(copy.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(copy.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+
+        // original has changed
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 6L))));
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(position.getBound("topic2"), equalTo(mkMap(mkEntry(2, 4L))));
+    }
+
+    @Test
+    public void shouldMergeNull() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position merged = position.merge(null);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));

Review comment:
       sure thing!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762146538



##########
File path: checkstyle/suppressions.xml
##########
@@ -164,7 +164,7 @@
               files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>
+              files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>

Review comment:
       Yes, for the same reason that you thought the `query` method was too long and complicated :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r760233137



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,100 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.forFailure(

Review comment:
       Also, the failure reason is a bit misleading




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#issuecomment-985745412


   Thanks again for the review, @cadonna ! I'm tempted to fix that stuff in the current PR, but I'd like to unblock others from rebasing their work on top of this framework. Since none of the feedback would invalidate those rebases, I'll go ahead and merge and then follow up asap with a PR to address your feedback.
   
   Thanks so much for your time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r760231430



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,100 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.forFailure(

Review comment:
       Why do we add a failure here? The for loop will continue iterating until it finds an active task that hosts that store partition, right? Don't you want to add the failure if no such active partition is found?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762154915



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class QueryResult<R> {
+
+    private final List<String> executionInfo = new LinkedList<>();
+    private final FailureReason failureReason;
+    private final String failure;
+    private final R result;
+    private Position position;
+
+    private QueryResult(final R result) {
+        this.result = result;
+        this.failureReason = null;
+        this.failure = null;
+    }
+
+    private QueryResult(final FailureReason failureReason, final String failure) {
+        this.result = null;
+        this.failureReason = failureReason;
+        this.failure = failure;
+    }
+
+    /**
+     * Static factory method to create a result object for a successful query. Used by StateStores
+     * to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forResult(final R result) {
+        return new QueryResult<>(result);
+    }
+
+    /**
+     * Static factory method to create a result object for a failed query. Used by StateStores to
+     * respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forFailure(
+        final FailureReason failureReason,
+        final String failureMessage) {
+
+        return new QueryResult<>(failureReason, failureMessage);
+    }
+
+    /**
+     * Static factory method to create a failed query result object to indicate that the store does
+     * not know how to handle the query.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forUnknownQueryType(
+        final Query<R> query,
+        final StateStore store) {
+
+        return new QueryResult<>(
+            FailureReason.UNKNOWN_QUERY_TYPE,
+            "This store (" + store.getClass() + ") doesn't know how to execute "
+                + "the given query (" + query + ")." +
+                " Contact the store maintainer if you need support for a new query type.");
+    }
+
+    /**
+     * Static factory method to create a failed query result object to indicate that the store has
+     * not yet caught up to the requested position bound.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> notUpToBound(
+        final Position currentPosition,
+        final PositionBound positionBound,
+        final int partition) {
+
+        return new QueryResult<>(
+            FailureReason.NOT_UP_TO_BOUND,
+            "For store partition " + partition + ", the current position "
+                + currentPosition + " is not yet up to the bound "
+                + positionBound
+        );
+    }
+
+    /**
+     * Used by stores to add detailed execution information (if requested) during query execution.
+     */
+    public void addExecutionInfo(final String message) {
+        executionInfo.add(message);
+    }
+
+    /**
+     * Used by stores to report what exact position in the store's history it was at when it
+     * executed the query.
+     */
+    public void setPosition(final Position position) {
+        this.position = position;
+    }
+
+    /**
+     * True iff the query was successfully executed. The response is available in {@link
+     * this#getResult()}.
+     */
+    public boolean isSuccess() {
+        return failureReason == null;
+    }
+
+
+    /**
+     * True iff the query execution failed. More information about the failure is available in
+     * {@link this#getFailureReason()} and {@link this#getFailureMessage()}.
+     */
+    public boolean isFailure() {
+        return failureReason != null;
+    }
+
+    /**
+     * If detailed execution information was requested in {@link StateQueryRequest#enableExecutionInfo()},
+     * this method returned the execution details for this partition's result.
+     */
+    public List<String> getExecutionInfo() {
+        return executionInfo;
+    }
+
+    /**
+     * This state partition's exact position in its history when this query was executed. Can be
+     * used in conjunction with subsequent queries via {@link StateQueryRequest#withPositionBound(PositionBound)}.
+     * <p>
+     * Note: stores are encouraged, but not required to set this property.
+     */
+    public Position getPosition() {
+        return position;
+    }
+
+    /**
+     * If this partition failed to execute the query, returns the reason.
+     *
+     * @throws IllegalArgumentException if this is not a failed result.
+     */
+    public FailureReason getFailureReason() {
+        if (!isFailure()) {
+            throw new IllegalArgumentException(
+                "Cannot get failure reason because this query did not fail."
+            );
+        }
+        return failureReason;
+    }
+
+    /**
+     * If this partition failed to execute the query, returns the failure message.
+     *
+     * @throws IllegalArgumentException if this is not a failed result.
+     */
+    public String getFailureMessage() {
+        if (!isFailure()) {
+            throw new IllegalArgumentException(
+                "Cannot get failure message because this query did not fail."
+            );
+        }
+        return failure;
+    }
+
+    /**
+     * Returns the result of executing the query on one partition. The result type is determined by
+     * the query. Note: queries may choose to return {@code null} for a successful query, so {@link
+     * this#isSuccess()} and {@link this#isFailure()} must be used to determine whether the query
+     * was successful of failed on this partition.
+     *
+     * @throws IllegalArgumentException if this is not a successful query.
+     */
+    public R getResult() {
+        if (!isSuccess()) {
+            throw new IllegalArgumentException(
+                "Cannot get result for failed query. Failure is " + failureReason.name() + ": "
+                    + failure);
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "QueryResult{" +
+            "executionInfo=" + executionInfo +
+            ", failureReason=" + failureReason +
+            ", failure='" + failure + '\'' +
+            ", result=" + result +
+            ", position=" + position +
+            '}';
+    }
+}

Review comment:
       Ah, I dropped it from the PR because I'm no longer 100% sure it's necessary. I left it in the KIP, though, in case I'm wrong about that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#issuecomment-983926936


   Rebased to resolve conflicts with #11541


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] patrickstuedi commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r761770579



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+
+public final class StoreQueryUtils {
+
+    // make this class uninstantiable
+    private StoreQueryUtils() {
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <R> QueryResult<R> handleBasicQueries(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo,
+        final StateStore store) {
+
+        final QueryResult<R> result;
+        final long start = collectExecutionInfo ? System.nanoTime() : -1L;
+        // TODO: position tracking
+        if (query instanceof PingQuery) {
+            result = (QueryResult<R>) QueryResult.forResult(true);

Review comment:
       How do we ensure type R matches the type of template type of the store?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762081422



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,110 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            final Set<Integer> handledPartitions = new HashSet<>();
+
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.forFailure(
+                                        FailureReason.NOT_ACTIVE,
+                                        "Query requires a running active task,"
+                                            + " but partition was in state "
+                                            + state + " and was "
+                                            + (active ? "active" : "not active") + "."
+                                    )
+                                );
+                            } else {
+                                final QueryResult<R> r = store.query(
+                                    request.getQuery(),
+                                    request.isRequireActive()
+                                        ? PositionBound.unbounded()
+                                        : request.getPositionBound(),
+                                    request.executionInfoEnabled()
+                                );
+                                result.addResult(partition, r);
+                            }
+                        }
+
+                        // optimization: if we have handled all the requested partitions,
+                        // we can return right away.
+                        handledPartitions.add(partition);
+                        if (!request.isAllPartitions()
+                            && handledPartitions.containsAll(request.getPartitions())) {
+                            return result;
+                        }

Review comment:
       Ah, I see what you mean. Yes, it's a bit mysterious this way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762163072



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+
+public final class StoreQueryUtils {

Review comment:
       Yeah, you're right. These do need unit tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #11557:
URL: https://github.com/apache/kafka/pull/11557


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762153447



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Objects;
+
+/**
+ * A class bounding the processing state {@link Position} during queries. This can be used to
+ * specify that a query should fail if the locally available partition isn't caught up to the
+ * specified bound. "Unbounded" places no restrictions on the current location of the partition.
+ */
+@Evolving
+public class PositionBound {
+
+    private final Position position;
+    private final boolean unbounded;
+
+    private PositionBound(final Position position, final boolean unbounded) {
+        if (unbounded && position != null) {
+            throw new IllegalArgumentException();
+        } else if (position != null) {
+            this.position = position.copy();
+            this.unbounded = false;
+        } else {
+            this.position = null;
+            this.unbounded = unbounded;
+        }
+    }
+
+    /**
+     * Creates a new PositionBound representing "no bound"
+     */
+    public static PositionBound unbounded() {
+        return new PositionBound(null, true);
+    }
+
+    /**
+     * Creates a new PositionBound representing a specific position.
+     */
+    public static PositionBound at(final Position position) {
+        return new PositionBound(position, false);
+    }
+
+    /**
+     * Returns true iff this object specifies that there is no position bound.
+     */
+    public boolean isUnbounded() {
+        return unbounded;
+    }
+
+    /**
+     * Returns the specific position of this bound.
+     *
+     * @throws IllegalArgumentException if this is an "unbounded" position.
+     */
+    public Position position() {
+        if (unbounded) {
+            throw new IllegalArgumentException(

Review comment:
       Yeah, I realize this is a bit creative. I was thinking of the fact that `this` is an implicit argument to all instance methods.
   
   ISE seems to imply that we're in some kind of unexpected edge case, and the docs imply that it's probably timing related. Neither of those are appropriate here, since, if you create an unbounded PositionBound, it's simply illegal to ask for its position.
   
   It seems "the people" are also divided on this point (https://stackoverflow.com/questions/30086604/which-exception-to-throw-if-method-is-not-appropriate-for-current-object)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762081149



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,110 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =

Review comment:
       Sure! Sorry about that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] patrickstuedi commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r760218888



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Position.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+
+/**
+ * A representation of a position vector with respect to a set of topic partitions. For example, in
+ * Interactive Query ({@link org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a
+ * query result may contain information from multiple store partitions, each of which contains
+ * information from multiple input topics' partitions. This class can be used to summarize all of
+ * that positional information.
+ * <p>
+ * This class is threadsafe, although it is mutable. Readers are recommended to use {@link
+ * Position#copy()} to avoid seeing mutations to the Position after they get the reference. For
+ * examples, when a store executes a {@link org.apache.kafka.streams.processor.StateStore#query(Query,
+ * PositionBound, boolean)} request and returns its current position via {@link
+ * QueryResult#setPosition(Position)}, it should pass a copy of its position instead of the mutable
+ * reference.
+ */
+@Evolving
+public class Position {
+
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position;
+
+    private Position(final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position) {
+        this.position = position;
+    }
+
+    /**
+     * Create a new, empty Position.
+     */
+    public static Position emptyPosition() {
+        return new Position(new ConcurrentHashMap<>());
+    }
+
+    /**
+     * Create a new Position and populate it with a mapping of topic -> partition -> offset.
+     * <p>
+     * Note, the resulting Position does not share any structure with the provided map, so
+     * subsequent changes to the map or Position will not affect the other.
+     */
+    public static Position fromMap(final Map<String, ? extends Map<Integer, Long>> map) {
+        return new Position(deepCopy(map));
+    }
+
+    /**
+     * Augment an existing Position by setting a new offset for a topic and partition.
+     * <p>
+     * Note: enforces monotonicity on offsets. I.e., if there is already a component for the same
+     * topic and partition with a larger offset, the update will succeed but not overwrite the
+     * offset.
+     * <p>
+     * Returns a self-reference for chained calls. Note: this method mutates the Position.
+     */
+    public Position withComponent(final String topic, final int partition, final long offset) {
+        position
+            .computeIfAbsent(topic, k -> new ConcurrentHashMap<>())
+            .compute(
+                partition,
+                (integer, prior) -> prior == null || offset > prior ? offset : prior

Review comment:
       Nice, this avoids the use of AtomicLong! 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r760404989



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Position.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+
+/**
+ * A representation of a position vector with respect to a set of topic partitions. For example, in
+ * Interactive Query ({@link org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a
+ * query result may contain information from multiple store partitions, each of which contains
+ * information from multiple input topics' partitions. This class can be used to summarize all of
+ * that positional information.
+ * <p>
+ * This class is threadsafe, although it is mutable. Readers are recommended to use {@link
+ * Position#copy()} to avoid seeing mutations to the Position after they get the reference. For
+ * examples, when a store executes a {@link org.apache.kafka.streams.processor.StateStore#query(Query,
+ * PositionBound, boolean)} request and returns its current position via {@link
+ * QueryResult#setPosition(Position)}, it should pass a copy of its position instead of the mutable
+ * reference.
+ */
+@Evolving
+public class Position {
+
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position;
+
+    private Position(final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position) {
+        this.position = position;
+    }
+
+    /**
+     * Create a new, empty Position.
+     */
+    public static Position emptyPosition() {
+        return new Position(new ConcurrentHashMap<>());
+    }
+
+    /**
+     * Create a new Position and populate it with a mapping of topic -> partition -> offset.
+     * <p>
+     * Note, the resulting Position does not share any structure with the provided map, so
+     * subsequent changes to the map or Position will not affect the other.
+     */
+    public static Position fromMap(final Map<String, ? extends Map<Integer, Long>> map) {
+        return new Position(deepCopy(map));
+    }
+
+    /**
+     * Augment an existing Position by setting a new offset for a topic and partition.
+     * <p>
+     * Note: enforces monotonicity on offsets. I.e., if there is already a component for the same
+     * topic and partition with a larger offset, the update will succeed but not overwrite the
+     * offset.
+     * <p>
+     * Returns a self-reference for chained calls. Note: this method mutates the Position.
+     */
+    public Position withComponent(final String topic, final int partition, final long offset) {
+        position
+            .computeIfAbsent(topic, k -> new ConcurrentHashMap<>())
+            .compute(
+                partition,
+                (integer, prior) -> prior == null || offset > prior ? offset : prior

Review comment:
       Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r760254988



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
##########
@@ -184,6 +187,20 @@ public boolean isOpen() {
             return wrapped.isOpen();
         }
 
+        @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo) {
+
+            final long start = collectExecutionInfo ? System.nanoTime() : -1L;
+            final QueryResult<R> result = wrapped.query(query, positionBound, collectExecutionInfo);

Review comment:
       Not related to this PR but why is this not a WrappedStore since it is wrapping another store?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r761760939



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,110 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =

Review comment:
       nit: Could you give a more meaningful name to this variable?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Position.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A representation of a position vector with respect to a set of topic partitions. For example, in
+ * Interactive Query ({@link org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a
+ * query result may contain information from multiple store partitions, each of which contains
+ * information from multiple input topics' partitions. This class can be used to summarize all of
+ * that positional information.
+ * <p>
+ * This class is threadsafe, although it is mutable. Readers are recommended to use {@link
+ * Position#copy()} to avoid seeing mutations to the Position after they get the reference. For
+ * examples, when a store executes a {@link org.apache.kafka.streams.processor.StateStore#query(Query,
+ * PositionBound, boolean)} request and returns its current position via {@link
+ * QueryResult#setPosition(Position)}, it should pass a copy of its position instead of the mutable
+ * reference.
+ */
+@Evolving
+public class Position {
+
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position;
+
+    private Position(final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position) {
+        this.position = position;
+    }
+
+    /**
+     * Create a new, empty Position.
+     */
+    public static Position emptyPosition() {
+        return new Position(new ConcurrentHashMap<>());
+    }
+
+    /**
+     * Create a new Position and populate it with a mapping of topic -> partition -> offset.
+     * <p>
+     * Note, the resulting Position does not share any structure with the provided map, so
+     * subsequent changes to the map or Position will not affect the other.
+     */
+    public static Position fromMap(final Map<String, ? extends Map<Integer, Long>> map) {
+        return new Position(deepCopy(map));
+    }
+
+    /**
+     * Augment an existing Position by setting a new offset for a topic and partition.
+     * <p>
+     * Note: enforces monotonicity on offsets. I.e., if there is already a component for the same
+     * topic and partition with a larger offset, the update will succeed but not overwrite the
+     * offset.
+     * <p>
+     * Returns a self-reference for chained calls. Note: this method mutates the Position.
+     */
+    public Position withComponent(final String topic, final int partition, final long offset) {
+        position
+            .computeIfAbsent(topic, k -> new ConcurrentHashMap<>())
+            .compute(
+                partition,
+                (integer, prior) -> prior == null || offset > prior ? offset : prior
+            );
+        return this;
+    }
+
+    /**
+     * Create a deep copy of the Position.
+     */
+    public Position copy() {
+        return new Position(deepCopy(position));
+    }
+
+    /**
+     * Create a new, structurally independent Position that is the result of merging two other
+     * Positions.
+     * <p>
+     * If both Positions contain the same topic -> partition -> offset mapping, the resulting
+     * Position will contain a mapping with the larger of the two offsets.
+     */
+    public Position merge(final Position other) {
+        if (other == null) {
+            return this;

Review comment:
       It is a bit unexpected that the result is the instance itself. I would have expected a new instance. Anyways, could you document this special case in the javadocs?  

##########
File path: checkstyle/suppressions.xml
##########
@@ -164,7 +164,7 @@
               files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>
+              files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>

Review comment:
       Is this really needed?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -119,4 +124,32 @@ default void init(final StateStoreContext context, final StateStore root) {
      * @return {@code true} if the store is open
      */
     boolean isOpen();
+
+    /**
+     * Execute a query. Returns a QueryResult containing either result data or
+     * a failure.
+     * <p>
+     * If the store doesn't know how to handle the given query, the result
+     * shall be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
+     * If the store couldn't satisfy the given position bound, the result
+     * shall be a {@link FailureReason#NOT_UP_TO_BOUND}.
+     * <p>
+     * Note to store implementers: if your store does not support position tracking,
+     * you can correctly respond {@link FailureReason#NOT_UP_TO_BOUND} if the argument is
+     * anything but {@link PositionBound#unbounded()}. Be sure to explain in the failure message
+     * that bounded positions are not supported.
+     * <p>
+     * @param query The query to execute
+     * @param positionBound The position the store must be at or past
+     * @param collectExecutionInfo Whether the store should collect detailed execution info for the query
+     * @param <R> The result type
+     */
+    @Evolving
+    default <R> QueryResult<R> query(
+        Query<R> query,
+        PositionBound positionBound,
+        boolean collectExecutionInfo) {
+        // If a store doesn't implement a query handler, then all queries are unknown.

Review comment:
       Could you add this to the javadocs since it seems to be an important information for the outside world?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class QueryResult<R> {
+
+    private final List<String> executionInfo = new LinkedList<>();
+    private final FailureReason failureReason;
+    private final String failure;
+    private final R result;
+    private Position position;
+
+    private QueryResult(final R result) {
+        this.result = result;
+        this.failureReason = null;
+        this.failure = null;
+    }
+
+    private QueryResult(final FailureReason failureReason, final String failure) {
+        this.result = null;
+        this.failureReason = failureReason;
+        this.failure = failure;
+    }
+
+    /**
+     * Static factory method to create a result object for a successful query. Used by StateStores
+     * to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forResult(final R result) {
+        return new QueryResult<>(result);
+    }
+
+    /**
+     * Static factory method to create a result object for a failed query. Used by StateStores to
+     * respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forFailure(
+        final FailureReason failureReason,
+        final String failureMessage) {
+
+        return new QueryResult<>(failureReason, failureMessage);
+    }
+
+    /**
+     * Static factory method to create a failed query result object to indicate that the store does
+     * not know how to handle the query.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forUnknownQueryType(
+        final Query<R> query,
+        final StateStore store) {
+
+        return new QueryResult<>(
+            FailureReason.UNKNOWN_QUERY_TYPE,
+            "This store (" + store.getClass() + ") doesn't know how to execute "
+                + "the given query (" + query + ")." +
+                " Contact the store maintainer if you need support for a new query type.");
+    }
+
+    /**
+     * Static factory method to create a failed query result object to indicate that the store has
+     * not yet caught up to the requested position bound.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> notUpToBound(
+        final Position currentPosition,
+        final PositionBound positionBound,
+        final int partition) {
+
+        return new QueryResult<>(
+            FailureReason.NOT_UP_TO_BOUND,
+            "For store partition " + partition + ", the current position "
+                + currentPosition + " is not yet up to the bound "
+                + positionBound
+        );
+    }
+
+    /**
+     * Used by stores to add detailed execution information (if requested) during query execution.
+     */
+    public void addExecutionInfo(final String message) {
+        executionInfo.add(message);
+    }
+
+    /**
+     * Used by stores to report what exact position in the store's history it was at when it
+     * executed the query.
+     */
+    public void setPosition(final Position position) {
+        this.position = position;
+    }
+
+    /**
+     * True iff the query was successfully executed. The response is available in {@link
+     * this#getResult()}.
+     */
+    public boolean isSuccess() {
+        return failureReason == null;
+    }
+
+
+    /**
+     * True iff the query execution failed. More information about the failure is available in
+     * {@link this#getFailureReason()} and {@link this#getFailureMessage()}.
+     */
+    public boolean isFailure() {
+        return failureReason != null;
+    }
+
+    /**
+     * If detailed execution information was requested in {@link StateQueryRequest#enableExecutionInfo()},
+     * this method returned the execution details for this partition's result.
+     */
+    public List<String> getExecutionInfo() {
+        return executionInfo;
+    }
+
+    /**
+     * This state partition's exact position in its history when this query was executed. Can be
+     * used in conjunction with subsequent queries via {@link StateQueryRequest#withPositionBound(PositionBound)}.
+     * <p>
+     * Note: stores are encouraged, but not required to set this property.
+     */
+    public Position getPosition() {
+        return position;
+    }
+
+    /**
+     * If this partition failed to execute the query, returns the reason.
+     *
+     * @throws IllegalArgumentException if this is not a failed result.
+     */
+    public FailureReason getFailureReason() {
+        if (!isFailure()) {
+            throw new IllegalArgumentException(

Review comment:
       See my comment above regarding `IllegalArgumentException` vs. `IllegalStateException`.  There are also other occurrences in this class.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The response object for interactive queries. This wraps the individual partition results, as well
+ * as metadata relating to the result as a whole.
+ * <p>
+ *
+ * @param <R> The type of the query result.
+ */
+@Evolving
+public class StateQueryResult<R> {
+
+    private final Map<Integer, QueryResult<R>> partitionResults = new HashMap<>();
+    private QueryResult<R> globalResult = null;
+
+    /**
+     * Set the result for a global store query. Used by Kafka Streams and available for tests.
+     */
+    public void setGlobalResult(final QueryResult<R> r) {
+        this.globalResult = r;
+    }
+
+    /**
+     * Set the result for a partitioned store query. Used by Kafka Streams and available for tests.
+     */
+    public void addResult(final int partition, final QueryResult<R> r) {
+        partitionResults.put(partition, r);
+    }
+
+
+    /**
+     * The query's result for each partition that executed the query. Empty for global store
+     * queries.
+     */
+    public Map<Integer, QueryResult<R>> getPartitionResults() {
+        return partitionResults;
+    }
+
+    /**
+     * For queries that are expected to match records in only one partition, returns the result.
+     *
+     * @throws IllegalArgumentException if the results are not for exactly one partition.
+     */
+    public QueryResult<R> getOnlyPartitionResult() {
+        final List<QueryResult<R>> nonempty =
+            partitionResults
+                .values()
+                .stream()
+                .filter(r -> r.getResult() != null)
+                .collect(Collectors.toList());
+
+        if (nonempty.size() != 1) {
+            throw new IllegalArgumentException(
+                "The query did not return exactly one partition result: " + partitionResults
+            );
+        } else {
+            return nonempty.get(0);
+        }
+    }
+
+    /**
+     * The query's result for global store queries. Is {@code null} for non-global (partitioned)
+     * store queries.
+     */
+    public QueryResult<R> getGlobalResult() {

Review comment:
       I could not find this method in the KIP. Could you update the KIP?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class QueryResult<R> {

Review comment:
       Shouldn't this also be `@Evolving`?

##########
File path: streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+
+public class PositionTest {
+
+    @Test
+    public void shouldCreateFromMap() {
+        final Map<String, Map<Integer, Long>> map = mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        );
+
+        final Position position = Position.fromMap(map);
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+
+        // Should be a copy of the constructor map
+
+        map.get("topic1").put(99, 99L);
+
+        // so the position is still the original one
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+    }
+
+    @Test
+    public void shouldCreateFromNullMap() {
+        final Position position = Position.fromMap(null);
+        assertThat(position.getTopics(), equalTo(Collections.emptySet()));
+    }
+
+    @Test
+    public void shouldMerge() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position position1 = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 7L))), // update offset
+            mkEntry("topic1", mkMap(mkEntry(8, 1L))), // add partition
+            mkEntry("topic2", mkMap(mkEntry(9, 5L))) // add topic
+        ));
+
+        final Position merged = position.merge(position1);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 7L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(merged.getBound("topic2"), equalTo(mkMap(mkEntry(9, 5L))));
+    }
+
+    @Test
+    public void shouldUpdateComponentMonotonically() {
+        final Position position = Position.emptyPosition();
+        position.withComponent("topic", 3, 5L);
+        position.withComponent("topic", 3, 4L);
+        assertThat(position.getBound("topic").get(3), equalTo(5L));
+        position.withComponent("topic", 3, 6L);
+        assertThat(position.getBound("topic").get(3), equalTo(6L));
+    }
+
+    @Test
+    public void shouldCopy() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position copy = position.copy();
+
+        // mutate original
+        position.withComponent("topic", 0, 6L);
+        position.withComponent("topic1", 8, 1L);
+        position.withComponent("topic2", 2, 4L);
+
+        // copy has not changed
+        assertThat(copy.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(copy.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(copy.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+
+        // original has changed
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 6L))));
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(position.getBound("topic2"), equalTo(mkMap(mkEntry(2, 4L))));
+    }
+
+    @Test
+    public void shouldMergeNull() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position merged = position.merge(null);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));

Review comment:
       Could you please verify whether the result is the same instance or not? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,110 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            final Set<Integer> handledPartitions = new HashSet<>();
+
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.forFailure(
+                                        FailureReason.NOT_ACTIVE,
+                                        "Query requires a running active task,"
+                                            + " but partition was in state "
+                                            + state + " and was "
+                                            + (active ? "active" : "not active") + "."
+                                    )
+                                );
+                            } else {
+                                final QueryResult<R> r = store.query(
+                                    request.getQuery(),
+                                    request.isRequireActive()
+                                        ? PositionBound.unbounded()
+                                        : request.getPositionBound(),
+                                    request.executionInfoEnabled()
+                                );
+                                result.addResult(partition, r);
+                            }
+                        }
+
+                        // optimization: if we have handled all the requested partitions,
+                        // we can return right away.
+                        handledPartitions.add(partition);
+                        if (!request.isAllPartitions()
+                            && handledPartitions.containsAll(request.getPartitions())) {
+                            return result;
+                        }
+                    }
+                }
+            }
+        }
+
+        return result;
+    }

Review comment:
       In general, the method is quite long. Could you try to extract methods with meaningful names? For example, the checks in the beginning to verify whether the state is known or the stream thread is running can be extracted to methods.

##########
File path: streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class PositionBoundTest {
+
+    @Test
+    public void shouldCopyPosition() {
+        final Position position = Position.emptyPosition();
+        final PositionBound positionBound = PositionBound.at(position);
+        position.withComponent("topic", 1, 2L);
+
+        assertThat(position.getTopics(), equalTo(mkSet("topic")));
+        assertThat(positionBound.position().getTopics(), empty());
+    }
+
+    @Test
+    public void unboundedShouldBeUnbounded() {
+        final PositionBound bound = PositionBound.unbounded();
+        assertTrue(bound.isUnbounded());
+    }
+
+    @Test
+    public void unboundedShouldThrowOnPosition() {
+        final PositionBound bound = PositionBound.unbounded();
+        assertThrows(IllegalArgumentException.class, bound::position);
+    }
+
+    @Test
+    public void shouldEqualPosition() {
+        final PositionBound bound1 = PositionBound.at(Position.emptyPosition());
+        final PositionBound bound2 = PositionBound.at(Position.emptyPosition());
+        assertEquals(bound1, bound2);
+    }
+
+    @Test
+    public void shouldEqualUnbounded() {
+        final PositionBound bound1 = PositionBound.unbounded();
+        final PositionBound bound2 = PositionBound.unbounded();
+        assertEquals(bound1, bound2);
+    }
+
+    @Test
+    public void shouldEqualSelf() {
+        final PositionBound bound1 = PositionBound.at(Position.emptyPosition());
+        assertEquals(bound1, bound1);
+    }
+
+    @Test
+    public void shouldNotEqualNull() {
+        final PositionBound bound1 = PositionBound.at(Position.emptyPosition());
+        assertNotEquals(bound1, null);
+    }
+

Review comment:
       Could you add a test for when the unbounded flag differs and one for when the position differs? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Objects;
+
+/**
+ * A class bounding the processing state {@link Position} during queries. This can be used to
+ * specify that a query should fail if the locally available partition isn't caught up to the
+ * specified bound. "Unbounded" places no restrictions on the current location of the partition.
+ */
+@Evolving
+public class PositionBound {
+
+    private final Position position;
+    private final boolean unbounded;
+
+    private PositionBound(final Position position, final boolean unbounded) {
+        if (unbounded && position != null) {
+            throw new IllegalArgumentException();
+        } else if (position != null) {
+            this.position = position.copy();
+            this.unbounded = false;
+        } else {
+            this.position = null;
+            this.unbounded = unbounded;
+        }
+    }
+
+    /**
+     * Creates a new PositionBound representing "no bound"
+     */
+    public static PositionBound unbounded() {
+        return new PositionBound(null, true);
+    }
+
+    /**
+     * Creates a new PositionBound representing a specific position.
+     */
+    public static PositionBound at(final Position position) {
+        return new PositionBound(position, false);
+    }
+
+    /**
+     * Returns true iff this object specifies that there is no position bound.
+     */
+    public boolean isUnbounded() {
+        return unbounded;
+    }
+
+    /**
+     * Returns the specific position of this bound.
+     *
+     * @throws IllegalArgumentException if this is an "unbounded" position.
+     */
+    public Position position() {
+        if (unbounded) {
+            throw new IllegalArgumentException(

Review comment:
       Shouldn't this be an `IllegalStateException`? No arguments are passed to the method. The javadocs for `IllegalStateException`: 
   
   > Signals that a method has been invoked at an illegal or inappropriate time.
   
   which seems to be the case here. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Position.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A representation of a position vector with respect to a set of topic partitions. For example, in
+ * Interactive Query ({@link org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a
+ * query result may contain information from multiple store partitions, each of which contains
+ * information from multiple input topics' partitions. This class can be used to summarize all of
+ * that positional information.
+ * <p>
+ * This class is threadsafe, although it is mutable. Readers are recommended to use {@link
+ * Position#copy()} to avoid seeing mutations to the Position after they get the reference. For
+ * examples, when a store executes a {@link org.apache.kafka.streams.processor.StateStore#query(Query,
+ * PositionBound, boolean)} request and returns its current position via {@link
+ * QueryResult#setPosition(Position)}, it should pass a copy of its position instead of the mutable
+ * reference.
+ */
+@Evolving
+public class Position {

Review comment:
       `Position` is specified as an interface in the KIP. Could you update the KIP and inform the discuss thread?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class QueryResult<R> {
+
+    private final List<String> executionInfo = new LinkedList<>();
+    private final FailureReason failureReason;
+    private final String failure;
+    private final R result;
+    private Position position;
+
+    private QueryResult(final R result) {
+        this.result = result;
+        this.failureReason = null;
+        this.failure = null;
+    }
+
+    private QueryResult(final FailureReason failureReason, final String failure) {
+        this.result = null;
+        this.failureReason = failureReason;
+        this.failure = failure;
+    }
+
+    /**
+     * Static factory method to create a result object for a successful query. Used by StateStores
+     * to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forResult(final R result) {
+        return new QueryResult<>(result);
+    }
+
+    /**
+     * Static factory method to create a result object for a failed query. Used by StateStores to
+     * respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forFailure(
+        final FailureReason failureReason,
+        final String failureMessage) {
+
+        return new QueryResult<>(failureReason, failureMessage);
+    }
+
+    /**
+     * Static factory method to create a failed query result object to indicate that the store does
+     * not know how to handle the query.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forUnknownQueryType(
+        final Query<R> query,
+        final StateStore store) {
+
+        return new QueryResult<>(
+            FailureReason.UNKNOWN_QUERY_TYPE,
+            "This store (" + store.getClass() + ") doesn't know how to execute "
+                + "the given query (" + query + ")." +
+                " Contact the store maintainer if you need support for a new query type.");
+    }
+
+    /**
+     * Static factory method to create a failed query result object to indicate that the store has
+     * not yet caught up to the requested position bound.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> notUpToBound(
+        final Position currentPosition,
+        final PositionBound positionBound,
+        final int partition) {
+
+        return new QueryResult<>(
+            FailureReason.NOT_UP_TO_BOUND,
+            "For store partition " + partition + ", the current position "
+                + currentPosition + " is not yet up to the bound "
+                + positionBound
+        );
+    }
+
+    /**
+     * Used by stores to add detailed execution information (if requested) during query execution.
+     */
+    public void addExecutionInfo(final String message) {
+        executionInfo.add(message);
+    }
+
+    /**
+     * Used by stores to report what exact position in the store's history it was at when it
+     * executed the query.
+     */
+    public void setPosition(final Position position) {
+        this.position = position;
+    }
+
+    /**
+     * True iff the query was successfully executed. The response is available in {@link
+     * this#getResult()}.
+     */
+    public boolean isSuccess() {
+        return failureReason == null;
+    }
+
+
+    /**
+     * True iff the query execution failed. More information about the failure is available in
+     * {@link this#getFailureReason()} and {@link this#getFailureMessage()}.
+     */
+    public boolean isFailure() {
+        return failureReason != null;
+    }
+
+    /**
+     * If detailed execution information was requested in {@link StateQueryRequest#enableExecutionInfo()},
+     * this method returned the execution details for this partition's result.
+     */
+    public List<String> getExecutionInfo() {
+        return executionInfo;
+    }
+
+    /**
+     * This state partition's exact position in its history when this query was executed. Can be
+     * used in conjunction with subsequent queries via {@link StateQueryRequest#withPositionBound(PositionBound)}.
+     * <p>
+     * Note: stores are encouraged, but not required to set this property.
+     */
+    public Position getPosition() {
+        return position;
+    }
+
+    /**
+     * If this partition failed to execute the query, returns the reason.
+     *
+     * @throws IllegalArgumentException if this is not a failed result.
+     */
+    public FailureReason getFailureReason() {
+        if (!isFailure()) {
+            throw new IllegalArgumentException(
+                "Cannot get failure reason because this query did not fail."
+            );
+        }
+        return failureReason;
+    }
+
+    /**
+     * If this partition failed to execute the query, returns the failure message.
+     *
+     * @throws IllegalArgumentException if this is not a failed result.
+     */
+    public String getFailureMessage() {
+        if (!isFailure()) {
+            throw new IllegalArgumentException(
+                "Cannot get failure message because this query did not fail."
+            );
+        }
+        return failure;
+    }
+
+    /**
+     * Returns the result of executing the query on one partition. The result type is determined by
+     * the query. Note: queries may choose to return {@code null} for a successful query, so {@link
+     * this#isSuccess()} and {@link this#isFailure()} must be used to determine whether the query
+     * was successful of failed on this partition.
+     *
+     * @throws IllegalArgumentException if this is not a successful query.
+     */
+    public R getResult() {
+        if (!isSuccess()) {
+            throw new IllegalArgumentException(
+                "Cannot get result for failed query. Failure is " + failureReason.name() + ": "
+                    + failure);
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "QueryResult{" +
+            "executionInfo=" + executionInfo +
+            ", failureReason=" + failureReason +
+            ", failure='" + failure + '\'' +
+            ", result=" + result +
+            ", position=" + position +
+            '}';
+    }
+}

Review comment:
       I could not find `swapResult()` as described in the KIP.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Query.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+/**
+ * Marker interface that all interactive queries must implement (see {@link
+ * org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}).
+ * <p>
+ * You can find all available queries by searching for classes implementing this interface.
+ * <p>
+ * Kafka Streams will pass unknown query types straight through into the bytes stores, so callers
+ * can add custom queries by implementing this interface and providing custom stores that handle
+ * them (via {@link org.apache.kafka.streams.state.StoreSupplier}s.
+ * <p>
+ * See KIP-796 (https://cwiki.apache.org/confluence/x/34xnCw) for more details.
+ *
+ * @param <R> The type of the result returned by this query.
+ */
+public interface Query<R> {

Review comment:
       Shouldn't this also be `@Evolving`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Objects;
+
+/**
+ * A class bounding the processing state {@link Position} during queries. This can be used to
+ * specify that a query should fail if the locally available partition isn't caught up to the
+ * specified bound. "Unbounded" places no restrictions on the current location of the partition.
+ */
+@Evolving
+public class PositionBound {

Review comment:
       `PositionBound` is specified as an interface in the KIP. Could you update the KIP and inform the discuss thread?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The response object for interactive queries. This wraps the individual partition results, as well
+ * as metadata relating to the result as a whole.
+ * <p>
+ *
+ * @param <R> The type of the query result.
+ */
+@Evolving
+public class StateQueryResult<R> {
+
+    private final Map<Integer, QueryResult<R>> partitionResults = new HashMap<>();
+    private QueryResult<R> globalResult = null;
+
+    /**
+     * Set the result for a global store query. Used by Kafka Streams and available for tests.
+     */
+    public void setGlobalResult(final QueryResult<R> r) {
+        this.globalResult = r;
+    }
+
+    /**
+     * Set the result for a partitioned store query. Used by Kafka Streams and available for tests.
+     */
+    public void addResult(final int partition, final QueryResult<R> r) {

Review comment:
       I could not find this method in the KIP. Could you update the KIP?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,110 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            final Set<Integer> handledPartitions = new HashSet<>();
+
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.forFailure(
+                                        FailureReason.NOT_ACTIVE,
+                                        "Query requires a running active task,"
+                                            + " but partition was in state "
+                                            + state + " and was "
+                                            + (active ? "active" : "not active") + "."
+                                    )
+                                );
+                            } else {
+                                final QueryResult<R> r = store.query(
+                                    request.getQuery(),
+                                    request.isRequireActive()
+                                        ? PositionBound.unbounded()
+                                        : request.getPositionBound(),
+                                    request.executionInfoEnabled()
+                                );
+                                result.addResult(partition, r);
+                            }
+                        }
+
+                        // optimization: if we have handled all the requested partitions,
+                        // we can return right away.
+                        handledPartitions.add(partition);
+                        if (!request.isAllPartitions()
+                            && handledPartitions.containsAll(request.getPartitions())) {
+                            return result;
+                        }

Review comment:
       nit: Instead of putting a comment, could you extract the condition and give it a meaningful name?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class QueryResult<R> {

Review comment:
       Could you add unit tests for this class?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The response object for interactive queries. This wraps the individual partition results, as well
+ * as metadata relating to the result as a whole.
+ * <p>
+ *
+ * @param <R> The type of the query result.
+ */
+@Evolving
+public class StateQueryResult<R> {
+
+    private final Map<Integer, QueryResult<R>> partitionResults = new HashMap<>();
+    private QueryResult<R> globalResult = null;
+
+    /**
+     * Set the result for a global store query. Used by Kafka Streams and available for tests.
+     */
+    public void setGlobalResult(final QueryResult<R> r) {

Review comment:
       I could not find this method in the KIP. Could you update the KIP?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+
+public final class StoreQueryUtils {

Review comment:
       Could you add unit tests for this class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762147145



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -119,4 +124,32 @@ default void init(final StateStoreContext context, final StateStore root) {
      * @return {@code true} if the store is open
      */
     boolean isOpen();
+
+    /**
+     * Execute a query. Returns a QueryResult containing either result data or
+     * a failure.
+     * <p>
+     * If the store doesn't know how to handle the given query, the result
+     * shall be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
+     * If the store couldn't satisfy the given position bound, the result
+     * shall be a {@link FailureReason#NOT_UP_TO_BOUND}.
+     * <p>
+     * Note to store implementers: if your store does not support position tracking,
+     * you can correctly respond {@link FailureReason#NOT_UP_TO_BOUND} if the argument is
+     * anything but {@link PositionBound#unbounded()}. Be sure to explain in the failure message
+     * that bounded positions are not supported.
+     * <p>
+     * @param query The query to execute
+     * @param positionBound The position the store must be at or past
+     * @param collectExecutionInfo Whether the store should collect detailed execution info for the query
+     * @param <R> The result type
+     */
+    @Evolving
+    default <R> QueryResult<R> query(
+        Query<R> query,
+        PositionBound positionBound,
+        boolean collectExecutionInfo) {
+        // If a store doesn't implement a query handler, then all queries are unknown.

Review comment:
       You're totally right. I thought I had said that already, but maybe I'm thinking of some other conversation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#issuecomment-985750865


   The failures are unrelated:
   ```
       Build / ARM / kafka.server.ReplicaManagerTest.[1] usesTopicIds=true
       Build / JDK 8 and Scala 2.12 / kafka.server.ReplicaManagerTest.[1] usesTopicIds=true
       Build / JDK 11 and Scala 2.13 / kafka.server.ReplicaManagerTest.[1] usesTopicIds=true
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r759708995



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
##########
@@ -46,8 +46,8 @@
 
     /**
      * Return the metadata of the current topic/partition/offset if available.
-     * This is defined as the metadata of the record that is currently been
-     * processed by the StreamTask that holds the store.
+     * This is defined as the metadata of the record that is currently being
+     * processed (or was last processed) by the StreamTask that holds the store.

Review comment:
       This is a bit of a drive-by; I just noticed the wording could be improved.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
##########
@@ -115,6 +118,24 @@ public boolean isOpen() {
         return store.isOpen();
     }
 
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+
+        final long start = collectExecutionInfo ? System.nanoTime() : -1L;
+        final QueryResult<R> result = store.query(query, positionBound, collectExecutionInfo);

Review comment:
       Since these adapters are (presumably for a good reason) not extending WrappedStore, they have to have an implementation to delegate to their inner store.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/Stores.java
##########
@@ -122,22 +122,7 @@ public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(fina
      */
     public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {
         Objects.requireNonNull(name, "name cannot be null");
-        return new KeyValueBytesStoreSupplier() {
-            @Override
-            public String name() {
-                return name;
-            }
-
-            @Override
-            public KeyValueStore<Bytes, byte[]> get() {
-                return new InMemoryKeyValueStore(name);
-            }
-
-            @Override
-            public String metricsScope() {
-                return "in-memory";
-            }
-        };
+        return new InMemoryKeyValueBytesStoreSupplier(name);

Review comment:
       Having this inline made it harder to debug my test, so I just popped it out into a separate class.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -118,7 +136,7 @@ private void putInternal(final Bytes key, final byte[] value) {
 
         if (context != null && context.recordMetadata().isPresent()) {
             final RecordMetadata meta = context.recordMetadata().get();
-            position = position.update(meta.topic(), meta.partition(), meta.offset());
+            position = position.withComponent(meta.topic(), meta.partition(), meta.offset());

Review comment:
       Just migrating to the public Position API from the KIP.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java
##########
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-
-public class Position {

Review comment:
       Dropped the internal Position API and migrated to the new public Position API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+
+public final class StoreQueryUtils {
+
+    // make this class uninstantiable
+    private StoreQueryUtils() {
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <R> QueryResult<R> handleBasicQueries(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo,
+        final StateStore store) {
+
+        final QueryResult<R> result;
+        final long start = collectExecutionInfo ? System.nanoTime() : -1L;
+        // TODO: position tracking

Review comment:
       Just pointing out again that this isn't done in this PR.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
##########
@@ -103,6 +106,22 @@ public void close() {
         wrapped.close();
     }
 
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,

Review comment:
       Ok, here's the WrappedStore I keep referencing. Store layers that wrap other stores can extend this class, and if they do, they'll automatically inherit this delegating query implementation.
   
   Nb: because of this, there are dozens of other store implementations that don't appear in this PR because they didn't require any changes.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsNotStartedException;
+import org.apache.kafka.streams.errors.StreamsStoppedException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamThread.State;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.PingQuery;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.Collections.singleton;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.matchesPattern;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Category({IntegrationTest.class})
+public class IQv2IntegrationTest {

Review comment:
       First of two new integration tests.
   
   This one is more focused on the framework: verifying exceptions, etc. The other one is more focused on the store/query behavior, and is therefore more interesting.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.PingQuery;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.matchesPattern;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class IQv2StoreIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final StoresToTest storeToTest;
+
+    public static class UnknownQuery implements Query<Void> {
+
+    }
+
+    private final boolean cache;
+    private final boolean log;
+
+    private KafkaStreams kafkaStreams;
+
+    public enum StoresToTest {
+        GLOBAL_IN_MEMORY_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_IN_MEMORY_LRU {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.lruMap(STORE_NAME, 100);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        IN_MEMORY_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryKeyValueStore(STORE_NAME);
+            }
+        },
+        IN_MEMORY_LRU {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.lruMap(STORE_NAME, 100);
+            }
+        },
+        ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+        },
+        TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+        },
+        IN_MEMORY_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
+                    false);
+            }
+        },
+        ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
+                    false);
+            }
+        },
+        TIME_ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofDays(1),
+                    WINDOW_SIZE, false);
+            }
+        },
+        IN_MEMORY_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemorySessionStore(STORE_NAME, Duration.ofDays(1));
+            }
+        },
+        ROCKS_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentSessionStore(STORE_NAME, Duration.ofDays(1));
+            }
+        };
+
+        public abstract StoreSupplier<?> supplier();
+
+        public boolean global() {
+            return false;
+        }
+    }
+
+    @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}")
+    public static Collection<Object[]> data() {
+        final List<Object[]> values = new ArrayList<>();
+        for (final boolean cacheEnabled : Arrays.asList(true, false)) {
+            for (final boolean logEnabled : Arrays.asList(true, false)) {
+                for (final StoresToTest toTest : StoresToTest.values()) {

Review comment:
       In addition to iterating over all store types, we also iterate over caching and logging configs, so that we can be sure that IQv2 works regardless of the store configuration.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/PositionTest.java
##########
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state.internals;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-import java.io.IOException;
-import org.junit.Test;
-
-public class PositionTest {

Review comment:
       These cases are migrated into the new unit test.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.PingQuery;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.matchesPattern;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class IQv2StoreIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final StoresToTest storeToTest;
+
+    public static class UnknownQuery implements Query<Void> {
+
+    }
+
+    private final boolean cache;
+    private final boolean log;
+
+    private KafkaStreams kafkaStreams;
+
+    public enum StoresToTest {

Review comment:
       This test contains a registry of all officially provided plug-innable store types.
   
   The test just iterates over this registry and runs the checks on each store, so this can be a good place to add tests for new queries that should be supported, and should return the same results, for all stores.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueBytesStoreSupplier.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class InMemoryKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupplier {

Review comment:
       Just extracted from Stores above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.streams.KafkaStreams.State;
+
+/**
+ * Indicates that Kafka Streams is in a terminating or terminal state, such as {@link
+ * State#PENDING_SHUTDOWN},{@link State#PENDING_ERROR},{@link State#NOT_RUNNING}, or {@link
+ * State#ERROR}. This Streams instance will need to be discarded and replaced before it can
+ * serve queries. The caller may wish to query a different instance.
+ */
+public class StreamsStoppedException extends InvalidStateStoreException {

Review comment:
       New exception proposed in the KIP. See KafkaStreams#query javadoc.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
##########
@@ -184,6 +187,20 @@ public boolean isOpen() {
             return wrapped.isOpen();
         }
 
+        @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo) {
+
+            final long start = collectExecutionInfo ? System.nanoTime() : -1L;
+            final QueryResult<R> result = wrapped.query(query, positionBound, collectExecutionInfo);

Review comment:
       Another explicit delegation, since this is not a WrappedStore.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.Query;
+
+/**
+ * A very simple query that all stores can handle to verify that the store is participating in the
+ * IQv2 framework properly.
+ * <p>
+ * This is not a public API and may change without notice.
+ */
+public class PingQuery implements Query<Boolean> {

Review comment:
       The javadoc pretty much says it. This query is just for testing. It resides in the main module, since it needs to be visible to the store implementations, but it's not in the public API.
   
   You'll notice that when stores handle this query, they just return `true`. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,100 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {

Review comment:
       Main entry point for the new API.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsNotStartedException;
+import org.apache.kafka.streams.errors.StreamsStoppedException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamThread.State;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.PingQuery;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.Collections.singleton;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.matchesPattern;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Category({IntegrationTest.class})
+public class IQv2IntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    private KafkaStreams kafkaStreams;
+
+    @BeforeClass
+    public static void before()
+        throws InterruptedException, IOException, ExecutionException, TimeoutException {
+        CLUSTER.start();
+        final int partitions = 2;
+        CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+
+        final List<Future<RecordMetadata>> futures = new LinkedList<>();
+        try (final Producer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) {
+            for (int i = 0; i < 3; i++) {
+                final Future<RecordMetadata> send = producer.send(
+                    new ProducerRecord<>(
+                        INPUT_TOPIC_NAME,
+                        i % partitions,
+                        Time.SYSTEM.milliseconds(),
+                        i,
+                        i,
+                        null
+                    )
+                );
+                futures.add(send);
+                Time.SYSTEM.sleep(1L);
+            }
+            producer.flush();
+
+            for (final Future<RecordMetadata> future : futures) {
+                final RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES);
+                assertThat(recordMetadata.hasOffset(), is(true));
+                INPUT_POSITION.withComponent(
+                    recordMetadata.topic(),
+                    recordMetadata.partition(),
+                    recordMetadata.offset()
+                );
+            }
+        }
+
+        assertThat(INPUT_POSITION, equalTo(
+            Position
+                .emptyPosition()
+                .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                .withComponent(INPUT_TOPIC_NAME, 1, 0L)
+        ));
+    }
+
+    @Before
+    public void beforeTest() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.table(
+            INPUT_TOPIC_NAME,
+            Consumed.with(Serdes.Integer(), Serdes.Integer()),
+            Materialized.as(STORE_NAME)
+        );
+
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration());
+        kafkaStreams.cleanUp();
+    }
+
+    @After
+    public void afterTest() {
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Test
+    public void shouldFailUnknownStore() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore("unknown-store").withQuery(query);
+
+        assertThrows(UnknownStateStoreException.class, () -> kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldFailNotStarted() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query);
+
+        assertThrows(StreamsNotStartedException.class, () -> kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldFailStopped() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query);
+
+        kafkaStreams.start();
+        kafkaStreams.close(Duration.ZERO);
+        assertThrows(StreamsStoppedException.class, () -> kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldRejectNonRunningActive()
+        throws NoSuchFieldException, IllegalAccessException {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query).requireActive();
+        final Set<Integer> partitions = mkSet(0, 1);
+
+        kafkaStreams.start();
+
+        final Field threadsField = KafkaStreams.class.getDeclaredField("threads");
+        threadsField.setAccessible(true);
+        @SuppressWarnings("unchecked") final List<StreamThread> threads =
+            (List<StreamThread>) threadsField.get(kafkaStreams);
+        final StreamThread streamThread = threads.get(0);
+
+        final Field stateLock = StreamThread.class.getDeclaredField("stateLock");
+        stateLock.setAccessible(true);
+        final Object lock = stateLock.get(streamThread);
+
+        // wait for the desired partitions to be assigned
+        IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(
+            kafkaStreams,
+            inStore(STORE_NAME).withQuery(query),
+            partitions
+        );
+
+        // then lock the thread state, change it, and make our assertions.
+        synchronized (lock) {
+            final Field stateField = StreamThread.class.getDeclaredField("state");
+            stateField.setAccessible(true);
+            stateField.set(streamThread, State.PARTITIONS_ASSIGNED);
+
+            final StateQueryResult<Boolean> result =
+                IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(
+                    kafkaStreams,
+                    request,
+                    partitions
+                );
+
+            assertThat(result.getPartitionResults().keySet(), is(partitions));
+            for (final Integer partition : partitions) {
+                assertThat(result.getPartitionResults().get(partition).isFailure(), is(true));
+                assertThat(
+                    result.getPartitionResults().get(partition).getFailureReason(),
+                    is(FailureReason.NOT_UP_TO_BOUND)
+                );
+                assertThat(
+                    result.getPartitionResults().get(partition).getFailureMessage(),
+                    is("Query requires a running active task,"
+                        + " but partition was in state PARTITIONS_ASSIGNED and was active.")
+                );
+            }
+        }
+    }
+
+    @Test
+    public void shouldFetchFromPartition() {
+        final PingQuery query = new PingQuery();
+        final int partition = 1;
+        final Set<Integer> partitions = singleton(partition);
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query).withPartitions(partitions);
+
+        kafkaStreams.start();
+        final StateQueryResult<Boolean> result =
+            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+
+        assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
+    }
+
+    @Test
+    public void shouldFetchExplicitlyFromAllPartitions() {
+        final PingQuery query = new PingQuery();
+        final Set<Integer> partitions = mkSet(0, 1);
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query).withAllPartitions();
+
+        kafkaStreams.start();
+        final StateQueryResult<Boolean> result =
+            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+
+        assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
+    }
+
+    @Test
+    public void shouldNotRequireQueryHandler() {
+        final PingQuery query = new PingQuery();
+        final int partition = 1;
+        final Set<Integer> partitions = singleton(partition);
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query).withPartitions(partitions);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.table(
+            INPUT_TOPIC_NAME,
+            Consumed.with(Serdes.Integer(), Serdes.Integer()),
+            Materialized.as(new KeyValueBytesStoreSupplier() {
+                @Override
+                public String name() {
+                    return STORE_NAME;
+                }
+
+                @Override
+                public KeyValueStore<Bytes, byte[]> get() {
+                    return new KeyValueStore<Bytes, byte[]>() {

Review comment:
       This takes up a lot of real estate, but what we're doing is creating a custom state store implementation that for sure doesn't implement the handler, which lets us verify the default behavior when stores don't implement the handler.
   
   I didn't want to use any pre-existing store implementation for this, just in case we actually do decide to add a query handler to that store in the future.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsNotStartedException;
+import org.apache.kafka.streams.errors.StreamsStoppedException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamThread.State;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.PingQuery;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.Collections.singleton;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.matchesPattern;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Category({IntegrationTest.class})
+public class IQv2IntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    private KafkaStreams kafkaStreams;
+
+    @BeforeClass
+    public static void before()
+        throws InterruptedException, IOException, ExecutionException, TimeoutException {
+        CLUSTER.start();
+        final int partitions = 2;
+        CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+
+        final List<Future<RecordMetadata>> futures = new LinkedList<>();
+        try (final Producer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) {
+            for (int i = 0; i < 3; i++) {
+                final Future<RecordMetadata> send = producer.send(
+                    new ProducerRecord<>(
+                        INPUT_TOPIC_NAME,
+                        i % partitions,
+                        Time.SYSTEM.milliseconds(),
+                        i,
+                        i,
+                        null
+                    )
+                );
+                futures.add(send);
+                Time.SYSTEM.sleep(1L);
+            }
+            producer.flush();
+
+            for (final Future<RecordMetadata> future : futures) {
+                final RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES);
+                assertThat(recordMetadata.hasOffset(), is(true));
+                INPUT_POSITION.withComponent(
+                    recordMetadata.topic(),
+                    recordMetadata.partition(),
+                    recordMetadata.offset()
+                );
+            }
+        }
+
+        assertThat(INPUT_POSITION, equalTo(
+            Position
+                .emptyPosition()
+                .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                .withComponent(INPUT_TOPIC_NAME, 1, 0L)
+        ));
+    }
+
+    @Before
+    public void beforeTest() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.table(
+            INPUT_TOPIC_NAME,
+            Consumed.with(Serdes.Integer(), Serdes.Integer()),
+            Materialized.as(STORE_NAME)
+        );
+
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration());
+        kafkaStreams.cleanUp();
+    }
+
+    @After
+    public void afterTest() {
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Test
+    public void shouldFailUnknownStore() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore("unknown-store").withQuery(query);
+
+        assertThrows(UnknownStateStoreException.class, () -> kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldFailNotStarted() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query);
+
+        assertThrows(StreamsNotStartedException.class, () -> kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldFailStopped() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query);
+
+        kafkaStreams.start();
+        kafkaStreams.close(Duration.ZERO);
+        assertThrows(StreamsStoppedException.class, () -> kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldRejectNonRunningActive()
+        throws NoSuchFieldException, IllegalAccessException {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query).requireActive();
+        final Set<Integer> partitions = mkSet(0, 1);
+
+        kafkaStreams.start();
+
+        final Field threadsField = KafkaStreams.class.getDeclaredField("threads");
+        threadsField.setAccessible(true);
+        @SuppressWarnings("unchecked") final List<StreamThread> threads =
+            (List<StreamThread>) threadsField.get(kafkaStreams);
+        final StreamThread streamThread = threads.get(0);
+
+        final Field stateLock = StreamThread.class.getDeclaredField("stateLock");
+        stateLock.setAccessible(true);
+        final Object lock = stateLock.get(streamThread);
+
+        // wait for the desired partitions to be assigned
+        IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(
+            kafkaStreams,
+            inStore(STORE_NAME).withQuery(query),
+            partitions
+        );
+
+        // then lock the thread state, change it, and make our assertions.
+        synchronized (lock) {

Review comment:
       Since we're holding this lock during the test, Streams itself will not be able to change state, which means it won't be able to surprise us by starting/ending a rebalance.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsNotStartedException;
+import org.apache.kafka.streams.errors.StreamsStoppedException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamThread.State;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.PingQuery;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.Collections.singleton;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.matchesPattern;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Category({IntegrationTest.class})
+public class IQv2IntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    private KafkaStreams kafkaStreams;
+
+    @BeforeClass
+    public static void before()
+        throws InterruptedException, IOException, ExecutionException, TimeoutException {
+        CLUSTER.start();
+        final int partitions = 2;
+        CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+
+        final List<Future<RecordMetadata>> futures = new LinkedList<>();
+        try (final Producer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) {
+            for (int i = 0; i < 3; i++) {
+                final Future<RecordMetadata> send = producer.send(
+                    new ProducerRecord<>(
+                        INPUT_TOPIC_NAME,
+                        i % partitions,
+                        Time.SYSTEM.milliseconds(),
+                        i,
+                        i,
+                        null
+                    )
+                );
+                futures.add(send);
+                Time.SYSTEM.sleep(1L);
+            }
+            producer.flush();
+
+            for (final Future<RecordMetadata> future : futures) {
+                final RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES);
+                assertThat(recordMetadata.hasOffset(), is(true));
+                INPUT_POSITION.withComponent(
+                    recordMetadata.topic(),
+                    recordMetadata.partition(),
+                    recordMetadata.offset()
+                );
+            }
+        }
+
+        assertThat(INPUT_POSITION, equalTo(
+            Position
+                .emptyPosition()
+                .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                .withComponent(INPUT_TOPIC_NAME, 1, 0L)
+        ));
+    }
+
+    @Before
+    public void beforeTest() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.table(
+            INPUT_TOPIC_NAME,
+            Consumed.with(Serdes.Integer(), Serdes.Integer()),
+            Materialized.as(STORE_NAME)
+        );
+
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration());
+        kafkaStreams.cleanUp();
+    }
+
+    @After
+    public void afterTest() {
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Test
+    public void shouldFailUnknownStore() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore("unknown-store").withQuery(query);
+
+        assertThrows(UnknownStateStoreException.class, () -> kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldFailNotStarted() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query);
+
+        assertThrows(StreamsNotStartedException.class, () -> kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldFailStopped() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query);
+
+        kafkaStreams.start();
+        kafkaStreams.close(Duration.ZERO);

Review comment:
       calling close changes the state immediately to `pending_shutdown`, so we can verify the exception without waiting for the actual close to complete.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.PingQuery;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.matchesPattern;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class IQv2StoreIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final StoresToTest storeToTest;
+
+    public static class UnknownQuery implements Query<Void> {
+
+    }
+
+    private final boolean cache;
+    private final boolean log;
+
+    private KafkaStreams kafkaStreams;
+
+    public enum StoresToTest {
+        GLOBAL_IN_MEMORY_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_IN_MEMORY_LRU {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.lruMap(STORE_NAME, 100);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        IN_MEMORY_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryKeyValueStore(STORE_NAME);
+            }
+        },
+        IN_MEMORY_LRU {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.lruMap(STORE_NAME, 100);
+            }
+        },
+        ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+        },
+        TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+        },
+        IN_MEMORY_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
+                    false);
+            }
+        },
+        ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
+                    false);
+            }
+        },
+        TIME_ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofDays(1),
+                    WINDOW_SIZE, false);
+            }
+        },
+        IN_MEMORY_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemorySessionStore(STORE_NAME, Duration.ofDays(1));
+            }
+        },
+        ROCKS_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentSessionStore(STORE_NAME, Duration.ofDays(1));
+            }
+        };
+
+        public abstract StoreSupplier<?> supplier();
+
+        public boolean global() {
+            return false;
+        }
+    }
+
+    @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}")
+    public static Collection<Object[]> data() {
+        final List<Object[]> values = new ArrayList<>();
+        for (final boolean cacheEnabled : Arrays.asList(true, false)) {
+            for (final boolean logEnabled : Arrays.asList(true, false)) {
+                for (final StoresToTest toTest : StoresToTest.values()) {
+                    values.add(new Object[]{cacheEnabled, logEnabled, toTest.name()});
+                }
+            }
+        }
+        return values;
+    }
+
+    public IQv2StoreIntegrationTest(
+        final boolean cache,
+        final boolean log,
+        final String storeToTest) {
+
+        this.cache = cache;
+        this.log = log;
+        this.storeToTest = StoresToTest.valueOf(storeToTest);
+    }
+
+    @BeforeClass
+    public static void before()
+        throws InterruptedException, IOException, ExecutionException, TimeoutException {
+        CLUSTER.start();
+        final int partitions = 2;
+        CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+
+        final List<Future<RecordMetadata>> futures = new LinkedList<>();
+        try (final Producer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) {
+            for (int i = 0; i < 3; i++) {
+                final Future<RecordMetadata> send = producer.send(
+                    new ProducerRecord<>(
+                        INPUT_TOPIC_NAME,
+                        i % partitions,
+                        Time.SYSTEM.milliseconds(),
+                        i,
+                        i,
+                        null
+                    )
+                );
+                futures.add(send);
+                Time.SYSTEM.sleep(1L);
+            }
+            producer.flush();
+
+            for (final Future<RecordMetadata> future : futures) {
+                final RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES);
+                assertThat(recordMetadata.hasOffset(), is(true));
+                INPUT_POSITION.withComponent(
+                    recordMetadata.topic(),
+                    recordMetadata.partition(),
+                    recordMetadata.offset()
+                );
+            }
+        }
+
+        assertThat(INPUT_POSITION, equalTo(
+            Position
+                .emptyPosition()
+                .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                .withComponent(INPUT_TOPIC_NAME, 1, 0L)
+        ));
+    }
+
+    @Before

Review comment:
       For each store configuration, we will need to run a slightly different Streams application, so before each test, we create Streams and then tear it down (and clean up) afterwards.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+
+public final class StoreQueryUtils {
+
+    // make this class uninstantiable
+    private StoreQueryUtils() {
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <R> QueryResult<R> handleBasicQueries(

Review comment:
       This is the stub implementation I added to all the stores. It just returns `true` if it gets a ping query and returns an "unknown" query otherwise.
   
   I kind of wanted to establish some other pattern for efficiently dispatching queries for other store types, but this PR was already too long, so I just kept it simple.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsNotStartedException;
+import org.apache.kafka.streams.errors.StreamsStoppedException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamThread.State;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.PingQuery;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.Collections.singleton;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.matchesPattern;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Category({IntegrationTest.class})
+public class IQv2IntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    private KafkaStreams kafkaStreams;
+
+    @BeforeClass
+    public static void before()
+        throws InterruptedException, IOException, ExecutionException, TimeoutException {
+        CLUSTER.start();
+        final int partitions = 2;
+        CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+
+        final List<Future<RecordMetadata>> futures = new LinkedList<>();
+        try (final Producer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) {
+            for (int i = 0; i < 3; i++) {
+                final Future<RecordMetadata> send = producer.send(
+                    new ProducerRecord<>(
+                        INPUT_TOPIC_NAME,
+                        i % partitions,
+                        Time.SYSTEM.milliseconds(),
+                        i,
+                        i,
+                        null
+                    )
+                );
+                futures.add(send);
+                Time.SYSTEM.sleep(1L);
+            }
+            producer.flush();
+
+            for (final Future<RecordMetadata> future : futures) {
+                final RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES);
+                assertThat(recordMetadata.hasOffset(), is(true));
+                INPUT_POSITION.withComponent(
+                    recordMetadata.topic(),
+                    recordMetadata.partition(),
+                    recordMetadata.offset()
+                );
+            }
+        }
+
+        assertThat(INPUT_POSITION, equalTo(
+            Position
+                .emptyPosition()
+                .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                .withComponent(INPUT_TOPIC_NAME, 1, 0L)
+        ));
+    }
+
+    @Before
+    public void beforeTest() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.table(
+            INPUT_TOPIC_NAME,
+            Consumed.with(Serdes.Integer(), Serdes.Integer()),
+            Materialized.as(STORE_NAME)
+        );
+
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration());
+        kafkaStreams.cleanUp();
+    }
+
+    @After
+    public void afterTest() {
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Test
+    public void shouldFailUnknownStore() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore("unknown-store").withQuery(query);
+
+        assertThrows(UnknownStateStoreException.class, () -> kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldFailNotStarted() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query);
+
+        assertThrows(StreamsNotStartedException.class, () -> kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldFailStopped() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query);
+
+        kafkaStreams.start();
+        kafkaStreams.close(Duration.ZERO);
+        assertThrows(StreamsStoppedException.class, () -> kafkaStreams.query(request));
+    }
+
+    @Test
+    public void shouldRejectNonRunningActive()

Review comment:
       Ok, this test is kind of crazy. I needed to reliably "catch" Streams when the thread hosting a task is not in RUNNING state. There's really no good way to do that, so I had two options:
   
   1. Add a test to KafkaStreamsTest, which has an ultra-complicated mock setup, which could in theory let me drop into the right code path for this test.
   2. Use reflection to manually force a non-mocked KafkaStreams into the right state to verify this behavior.
   
   I'd obviously normally favor the mocked approach, but it really is super complicated in the case of KafkaStreamsTest. Despite being weirder, this approach is a lot simpler.
   
   Neither approach should be more brittle than the other, since the mocks are also using reflection and will only be verified at runtime. This is probably just my confirmation bias, but I think this approach may be less brittle in practice, since we're just surgically messing with three internals instead of the dozen or so mocks and static mocks required to get a functioning KafkaStreams for the other approach.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -104,6 +107,40 @@
     public static final long DEFAULT_TIMEOUT = 60 * 1000L;
     private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestUtils.class);
 
+    /**
+     * Repeatedly runs the query until the response is valid and then return the response.
+     * <p>
+     * Validity in this case means that the response contains all the desired partitions or that
+     * it's a global response.
+     * <p>
+     * Once position bounding is generally supported, we should migrate tests to wait on the
+     * expected response position.
+     */
+    public static <R> StateQueryResult<R> iqv2WaitForPartitionsOrGlobal(

Review comment:
       The javadoc pretty much sums this up. It's a harbinger of good things to come, since as you can see, we'll no longer be vulnerable to race conditions like rebalances sneaking in during the test. We can just wait for a valid result and then verify it.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.PingQuery;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.matchesPattern;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class IQv2StoreIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final StoresToTest storeToTest;
+
+    public static class UnknownQuery implements Query<Void> {
+
+    }
+
+    private final boolean cache;
+    private final boolean log;
+
+    private KafkaStreams kafkaStreams;
+
+    public enum StoresToTest {
+        GLOBAL_IN_MEMORY_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_IN_MEMORY_LRU {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.lruMap(STORE_NAME, 100);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        IN_MEMORY_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryKeyValueStore(STORE_NAME);
+            }
+        },
+        IN_MEMORY_LRU {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.lruMap(STORE_NAME, 100);
+            }
+        },
+        ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+        },
+        TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+        },
+        IN_MEMORY_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
+                    false);
+            }
+        },
+        ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
+                    false);
+            }
+        },
+        TIME_ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofDays(1),
+                    WINDOW_SIZE, false);
+            }
+        },
+        IN_MEMORY_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemorySessionStore(STORE_NAME, Duration.ofDays(1));
+            }
+        },
+        ROCKS_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentSessionStore(STORE_NAME, Duration.ofDays(1));
+            }
+        };
+
+        public abstract StoreSupplier<?> supplier();
+
+        public boolean global() {
+            return false;
+        }
+    }
+
+    @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}")
+    public static Collection<Object[]> data() {
+        final List<Object[]> values = new ArrayList<>();
+        for (final boolean cacheEnabled : Arrays.asList(true, false)) {
+            for (final boolean logEnabled : Arrays.asList(true, false)) {
+                for (final StoresToTest toTest : StoresToTest.values()) {
+                    values.add(new Object[]{cacheEnabled, logEnabled, toTest.name()});
+                }
+            }
+        }
+        return values;
+    }
+
+    public IQv2StoreIntegrationTest(
+        final boolean cache,
+        final boolean log,
+        final String storeToTest) {
+
+        this.cache = cache;
+        this.log = log;
+        this.storeToTest = StoresToTest.valueOf(storeToTest);
+    }
+
+    @BeforeClass

Review comment:
       To save time, we start Kafka and create and populate our input topic just once before all tests. None of the tests require writing new data, so this works.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Position.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A representation of a position vector with respect to a set of topic partitions. For example, in
+ * Interactive Query ({@link org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a
+ * query result may contain information from multiple store partitions, each of which contains
+ * information from multiple input topics' partitions. This class can be used to summarize all of
+ * that positional information.
+ * <p>
+ * This class is threadsafe, although it is mutable. Readers are recommended to use {@link
+ * Position#copy()} to avoid seeing mutations to the Position after they get the reference. For
+ * examples, when a store executes a {@link org.apache.kafka.streams.processor.StateStore#query(Query,
+ * PositionBound, boolean)} request and returns its current position via {@link
+ * QueryResult#setPosition(Position)}, it should pass a copy of its position instead of the mutable
+ * reference.
+ */
+@Evolving
+public class Position {

Review comment:
       This adds the public Position API that was proposed in the KIP. I've also gone ahead and removed the private Position api (from the `state.internals` package).

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -89,6 +93,20 @@ Position getPosition() {
         return position;
     }
 
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this
+        );

Review comment:
       I wanted to verify that the framework works correctly, but not get into adding (and testing) a real query right now, so I just added minimal query support to all stores in this PR.
   
   I'd expect us to change these implementations to something else once kv/window/session stores start to handle different sets of queries, but for now, they all have a stub implementation that just responds to the (internal) PingQuery.
   
   Note again: the position bound is not enforced in this PR, which is already way too long. We'll add that on (and test it) in a follow-on PR.

##########
File path: streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+
+public class PositionTest {
+
+    @Test
+    public void shouldCreateFromMap() {
+        final Map<String, Map<Integer, Long>> map = mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        );
+
+        final Position position = Position.fromMap(map);
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+
+        // Should be a copy of the constructor map
+
+        map.get("topic1").put(99, 99L);
+
+        // so the position is still the original one
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+    }
+
+    @Test
+    public void shouldCreateFromNullMap() {
+        final Position position = Position.fromMap(null);
+        assertThat(position.getTopics(), equalTo(Collections.emptySet()));
+    }
+
+    @Test
+    public void shouldMerge() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position position1 = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 7L))), // update offset
+            mkEntry("topic1", mkMap(mkEntry(8, 1L))), // add partition
+            mkEntry("topic2", mkMap(mkEntry(9, 5L))) // add topic
+        ));
+
+        final Position merged = position.merge(position1);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 7L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(merged.getBound("topic2"), equalTo(mkMap(mkEntry(9, 5L))));
+    }
+
+    @Test
+    public void shouldCopy() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position copy = position.copy();
+
+        // mutate original
+        position.withComponent("topic", 0, 6L);
+        position.withComponent("topic1", 8, 1L);
+        position.withComponent("topic2", 2, 4L);
+
+        // copy has not changed
+        assertThat(copy.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(copy.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(copy.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+
+        // original has changed
+        assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
+        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 6L))));
+        assertThat(position.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L),
+            mkEntry(8, 1L)
+        )));
+        assertThat(position.getBound("topic2"), equalTo(mkMap(mkEntry(2, 4L))));
+    }
+
+    @Test
+    public void shouldMergeNull() {
+        final Position position = Position.fromMap(mkMap(
+            mkEntry("topic", mkMap(mkEntry(0, 5L))),
+            mkEntry("topic1", mkMap(
+                mkEntry(0, 5L),
+                mkEntry(7, 0L)
+            ))
+        ));
+
+        final Position merged = position.merge(null);
+
+        assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1")));
+        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+            mkEntry(0, 5L),
+            mkEntry(7, 0L)
+        )));
+    }
+
+    @Test
+    public void shouldMatchOnEqual() {
+        final Position position1 = Position.emptyPosition();
+        final Position position2 = Position.emptyPosition();
+        position1.withComponent("topic1", 0, 1);
+        position2.withComponent("topic1", 0, 1);
+
+        position1.withComponent("topic1", 1, 2);
+        position2.withComponent("topic1", 1, 2);
+
+        position1.withComponent("topic1", 2, 1);
+        position2.withComponent("topic1", 2, 1);
+
+        position1.withComponent("topic2", 0, 0);
+        position2.withComponent("topic2", 0, 0);
+
+        assertEquals(position1, position2);
+    }
+
+    @Test
+    public void shouldNotMatchOnUnEqual() {
+        final Position position1 = Position.emptyPosition();
+        final Position position2 = Position.emptyPosition();
+        position1.withComponent("topic1", 0, 1);
+        position2.withComponent("topic1", 0, 1);
+
+        position1.withComponent("topic1", 1, 2);
+
+        position1.withComponent("topic1", 2, 1);
+        position2.withComponent("topic1", 2, 1);
+
+        position1.withComponent("topic2", 0, 0);
+        position2.withComponent("topic2", 0, 0);
+
+        assertNotEquals(position1, position2);
+    }
+
+    @Test
+    public void shouldNotMatchNull() {
+        final Position position = Position.emptyPosition();
+        assertNotEquals(position, null);
+    }
+
+    @Test
+    public void shouldMatchSelf() {
+        final Position position = Position.emptyPosition();
+        assertEquals(position, position);
+    }
+
+    @Test
+    public void shouldNotHash() {

Review comment:
       note: since this class is mutable, I wanted to make sure we don't accidentally use it in sets or anything.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.PingQuery;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.matchesPattern;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class IQv2StoreIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final StoresToTest storeToTest;
+
+    public static class UnknownQuery implements Query<Void> {
+
+    }
+
+    private final boolean cache;
+    private final boolean log;
+
+    private KafkaStreams kafkaStreams;
+
+    public enum StoresToTest {
+        GLOBAL_IN_MEMORY_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_IN_MEMORY_LRU {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.lruMap(STORE_NAME, 100);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        IN_MEMORY_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryKeyValueStore(STORE_NAME);
+            }
+        },
+        IN_MEMORY_LRU {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.lruMap(STORE_NAME, 100);
+            }
+        },
+        ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+        },
+        TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+        },
+        IN_MEMORY_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
+                    false);
+            }
+        },
+        ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
+                    false);
+            }
+        },
+        TIME_ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofDays(1),
+                    WINDOW_SIZE, false);
+            }
+        },
+        IN_MEMORY_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemorySessionStore(STORE_NAME, Duration.ofDays(1));
+            }
+        },
+        ROCKS_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentSessionStore(STORE_NAME, Duration.ofDays(1));
+            }
+        };
+
+        public abstract StoreSupplier<?> supplier();
+
+        public boolean global() {
+            return false;
+        }
+    }
+
+    @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}")
+    public static Collection<Object[]> data() {
+        final List<Object[]> values = new ArrayList<>();
+        for (final boolean cacheEnabled : Arrays.asList(true, false)) {
+            for (final boolean logEnabled : Arrays.asList(true, false)) {
+                for (final StoresToTest toTest : StoresToTest.values()) {
+                    values.add(new Object[]{cacheEnabled, logEnabled, toTest.name()});
+                }
+            }
+        }
+        return values;
+    }
+
+    public IQv2StoreIntegrationTest(
+        final boolean cache,
+        final boolean log,
+        final String storeToTest) {
+
+        this.cache = cache;
+        this.log = log;
+        this.storeToTest = StoresToTest.valueOf(storeToTest);
+    }
+
+    @BeforeClass
+    public static void before()
+        throws InterruptedException, IOException, ExecutionException, TimeoutException {
+        CLUSTER.start();
+        final int partitions = 2;
+        CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+
+        final List<Future<RecordMetadata>> futures = new LinkedList<>();
+        try (final Producer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) {
+            for (int i = 0; i < 3; i++) {
+                final Future<RecordMetadata> send = producer.send(
+                    new ProducerRecord<>(
+                        INPUT_TOPIC_NAME,
+                        i % partitions,
+                        Time.SYSTEM.milliseconds(),
+                        i,
+                        i,
+                        null
+                    )
+                );
+                futures.add(send);
+                Time.SYSTEM.sleep(1L);
+            }
+            producer.flush();
+
+            for (final Future<RecordMetadata> future : futures) {
+                final RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES);
+                assertThat(recordMetadata.hasOffset(), is(true));
+                INPUT_POSITION.withComponent(
+                    recordMetadata.topic(),
+                    recordMetadata.partition(),
+                    recordMetadata.offset()
+                );
+            }
+        }
+
+        assertThat(INPUT_POSITION, equalTo(
+            Position
+                .emptyPosition()
+                .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                .withComponent(INPUT_TOPIC_NAME, 1, 0L)
+        ));
+    }
+
+    @Before
+    public void beforeTest() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final StoreSupplier<?> supplier = storeToTest.supplier();
+        if (supplier instanceof KeyValueBytesStoreSupplier) {
+            final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
+                Materialized.as((KeyValueBytesStoreSupplier) supplier);
+
+            if (cache) {
+                materialized.withCachingEnabled();
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            if (log) {
+                materialized.withLoggingEnabled(Collections.emptyMap());
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            if (storeToTest.global()) {
+                builder
+                    .globalTable(
+                        INPUT_TOPIC_NAME,
+                        Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                        materialized);
+            } else {
+                builder
+                    .table(
+                        INPUT_TOPIC_NAME,
+                        Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                        materialized);
+            }
+        } else if (supplier instanceof WindowBytesStoreSupplier) {
+            final Materialized<Integer, Integer, WindowStore<Bytes, byte[]>> materialized =
+                Materialized.as((WindowBytesStoreSupplier) supplier);
+
+            if (cache) {
+                materialized.withCachingEnabled();
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            if (log) {
+                materialized.withLoggingEnabled(Collections.emptyMap());
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            builder
+                .stream(
+                    INPUT_TOPIC_NAME,
+                    Consumed.with(Serdes.Integer(), Serdes.Integer())
+                )
+                .groupByKey()
+                .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
+                .aggregate(
+                    () -> 0,
+                    (key, value, aggregate) -> aggregate + value,
+                    materialized
+                );
+        } else if (supplier instanceof SessionBytesStoreSupplier) {
+            final Materialized<Integer, Integer, SessionStore<Bytes, byte[]>> materialized =
+                Materialized.as((SessionBytesStoreSupplier) supplier);
+
+            if (cache) {
+                materialized.withCachingEnabled();
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            if (log) {
+                materialized.withLoggingEnabled(Collections.emptyMap());
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            builder
+                .stream(
+                    INPUT_TOPIC_NAME,
+                    Consumed.with(Serdes.Integer(), Serdes.Integer())
+                )
+                .groupByKey()
+                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE))
+                .aggregate(
+                    () -> 0,
+                    (key, value, aggregate) -> aggregate + value,
+                    (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
+                    materialized
+                );
+        } else {
+            throw new AssertionError("Store supplier is an unrecognized type.");
+        }
+
+        // Don't need to wait for running, since tests can use iqv2 to wait until they
+        // get a valid response.
+        kafkaStreams =
+            IntegrationTestUtils.getStartedStreams(
+                streamsConfiguration(
+                    cache,
+                    log,
+                    supplier.getClass().getSimpleName()
+                ),
+                builder,
+                true
+            );
+    }
+
+    @After
+    public void afterTest() {
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Test
+    public void verifyStore() {
+        shouldRejectUnknownQuery();
+        shouldHandlePingQuery();
+        shouldCollectExecutionInfo();
+        shouldCollectExecutionInfoUnderFailure();
+    }

Review comment:
       This was a bit of a bummer for me. I was sad to learn that JUnit has no concept of Before/After Instance, so there's no way to set up the kafkaStreams application just once per store config and then run all the tests; it would have repeated the setup on each test in this class.
   
   This is super valuable, since creating/destroying Streams can take up to a second, while running these checks just takes a few milliseconds. So we'd be spending most of our time in setup/teardown, and considering the number of parameters, it'd be a lot of time.
   
   So, the workaround is to have just one `Test`, which calls all the checks we care about. It's slightly worse for debugging later, but the stacktrace will still tell us which check failed.

##########
File path: streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class PositionBoundTest {

Review comment:
       Just a basic unit test.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -89,6 +93,20 @@ Position getPosition() {
         return position;
     }
 
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this
+        );

Review comment:
       You can expect to see an identical stub in all the other provided store types.

##########
File path: streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+
+public class PositionTest {

Review comment:
       Another basic unit test, based on the internal API one that I removed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,100 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {

Review comment:
       This implementation may look inefficient, but it's actually identical to the way that interactive queries are currently served. I think it could be improved, but I'm leaving it to future PRs.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.PingQuery;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.matchesPattern;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class IQv2StoreIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final StoresToTest storeToTest;
+
+    public static class UnknownQuery implements Query<Void> {
+
+    }
+
+    private final boolean cache;
+    private final boolean log;
+
+    private KafkaStreams kafkaStreams;
+
+    public enum StoresToTest {
+        GLOBAL_IN_MEMORY_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_IN_MEMORY_LRU {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.lruMap(STORE_NAME, 100);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        GLOBAL_TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean global() {
+                return true;
+            }
+        },
+        IN_MEMORY_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryKeyValueStore(STORE_NAME);
+            }
+        },
+        IN_MEMORY_LRU {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.lruMap(STORE_NAME, 100);
+            }
+        },
+        ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+        },
+        TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+        },
+        IN_MEMORY_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
+                    false);
+            }
+        },
+        ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
+                    false);
+            }
+        },
+        TIME_ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofDays(1),
+                    WINDOW_SIZE, false);
+            }
+        },
+        IN_MEMORY_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.inMemorySessionStore(STORE_NAME, Duration.ofDays(1));
+            }
+        },
+        ROCKS_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentSessionStore(STORE_NAME, Duration.ofDays(1));
+            }
+        };
+
+        public abstract StoreSupplier<?> supplier();
+
+        public boolean global() {
+            return false;
+        }
+    }
+
+    @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}")
+    public static Collection<Object[]> data() {
+        final List<Object[]> values = new ArrayList<>();
+        for (final boolean cacheEnabled : Arrays.asList(true, false)) {
+            for (final boolean logEnabled : Arrays.asList(true, false)) {
+                for (final StoresToTest toTest : StoresToTest.values()) {
+                    values.add(new Object[]{cacheEnabled, logEnabled, toTest.name()});
+                }
+            }
+        }
+        return values;
+    }
+
+    public IQv2StoreIntegrationTest(
+        final boolean cache,
+        final boolean log,
+        final String storeToTest) {
+
+        this.cache = cache;
+        this.log = log;
+        this.storeToTest = StoresToTest.valueOf(storeToTest);
+    }
+
+    @BeforeClass
+    public static void before()
+        throws InterruptedException, IOException, ExecutionException, TimeoutException {
+        CLUSTER.start();
+        final int partitions = 2;
+        CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+
+        final List<Future<RecordMetadata>> futures = new LinkedList<>();
+        try (final Producer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) {
+            for (int i = 0; i < 3; i++) {
+                final Future<RecordMetadata> send = producer.send(
+                    new ProducerRecord<>(
+                        INPUT_TOPIC_NAME,
+                        i % partitions,
+                        Time.SYSTEM.milliseconds(),
+                        i,
+                        i,
+                        null
+                    )
+                );
+                futures.add(send);
+                Time.SYSTEM.sleep(1L);
+            }
+            producer.flush();
+
+            for (final Future<RecordMetadata> future : futures) {
+                final RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES);
+                assertThat(recordMetadata.hasOffset(), is(true));
+                INPUT_POSITION.withComponent(
+                    recordMetadata.topic(),
+                    recordMetadata.partition(),
+                    recordMetadata.offset()
+                );
+            }
+        }
+
+        assertThat(INPUT_POSITION, equalTo(
+            Position
+                .emptyPosition()
+                .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                .withComponent(INPUT_TOPIC_NAME, 1, 0L)
+        ));
+    }
+
+    @Before
+    public void beforeTest() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final StoreSupplier<?> supplier = storeToTest.supplier();
+        if (supplier instanceof KeyValueBytesStoreSupplier) {
+            final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
+                Materialized.as((KeyValueBytesStoreSupplier) supplier);
+
+            if (cache) {
+                materialized.withCachingEnabled();
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            if (log) {
+                materialized.withLoggingEnabled(Collections.emptyMap());
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            if (storeToTest.global()) {
+                builder
+                    .globalTable(
+                        INPUT_TOPIC_NAME,
+                        Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                        materialized);
+            } else {
+                builder
+                    .table(
+                        INPUT_TOPIC_NAME,
+                        Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                        materialized);
+            }
+        } else if (supplier instanceof WindowBytesStoreSupplier) {
+            final Materialized<Integer, Integer, WindowStore<Bytes, byte[]>> materialized =
+                Materialized.as((WindowBytesStoreSupplier) supplier);
+
+            if (cache) {
+                materialized.withCachingEnabled();
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            if (log) {
+                materialized.withLoggingEnabled(Collections.emptyMap());
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            builder
+                .stream(
+                    INPUT_TOPIC_NAME,
+                    Consumed.with(Serdes.Integer(), Serdes.Integer())
+                )
+                .groupByKey()
+                .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
+                .aggregate(
+                    () -> 0,
+                    (key, value, aggregate) -> aggregate + value,
+                    materialized
+                );
+        } else if (supplier instanceof SessionBytesStoreSupplier) {
+            final Materialized<Integer, Integer, SessionStore<Bytes, byte[]>> materialized =
+                Materialized.as((SessionBytesStoreSupplier) supplier);
+
+            if (cache) {
+                materialized.withCachingEnabled();
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            if (log) {
+                materialized.withLoggingEnabled(Collections.emptyMap());
+            } else {
+                materialized.withCachingDisabled();
+            }
+
+            builder
+                .stream(
+                    INPUT_TOPIC_NAME,
+                    Consumed.with(Serdes.Integer(), Serdes.Integer())
+                )
+                .groupByKey()
+                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE))
+                .aggregate(
+                    () -> 0,
+                    (key, value, aggregate) -> aggregate + value,
+                    (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
+                    materialized
+                );
+        } else {
+            throw new AssertionError("Store supplier is an unrecognized type.");
+        }
+
+        // Don't need to wait for running, since tests can use iqv2 to wait until they
+        // get a valid response.
+        kafkaStreams =
+            IntegrationTestUtils.getStartedStreams(
+                streamsConfiguration(
+                    cache,
+                    log,
+                    supplier.getClass().getSimpleName()
+                ),
+                builder,
+                true
+            );
+    }
+
+    @After
+    public void afterTest() {
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Test
+    public void verifyStore() {
+        shouldRejectUnknownQuery();
+        shouldHandlePingQuery();
+        shouldCollectExecutionInfo();
+        shouldCollectExecutionInfoUnderFailure();
+    }
+
+    public void shouldRejectUnknownQuery() {
+
+        final UnknownQuery query = new UnknownQuery();
+        final StateQueryRequest<Void> request = inStore(STORE_NAME).withQuery(query);
+        final Set<Integer> partitions = mkSet(0, 1);
+
+        final StateQueryResult<Void> result =
+            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+
+        makeAssertions(
+            partitions,
+            result,
+            queryResult -> {
+                assertThat(queryResult.isFailure(), is(true));
+                assertThat(queryResult.isSuccess(), is(false));
+                assertThat(queryResult.getPosition(), is(nullValue()));
+                assertThat(queryResult.getFailureReason(),
+                    is(FailureReason.UNKNOWN_QUERY_TYPE));
+                assertThat(queryResult.getFailureMessage(),
+                    matchesPattern(
+                        "This store (.*)"
+                            + " doesn't know how to execute the given query"
+                            + " (.*)."
+                            + " Contact the store maintainer if you need support for a new query type."
+                    )
+                );
+                assertThrows(IllegalArgumentException.class, queryResult::getResult);
+
+                assertThat(queryResult.getExecutionInfo(), is(empty()));
+            }
+        );
+    }
+
+    public void shouldHandlePingQuery() {
+
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query);
+        final Set<Integer> partitions = mkSet(0, 1);
+
+        final StateQueryResult<Boolean> result =
+            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+
+        makeAssertions(
+            partitions,
+            result,
+            queryResult -> {
+                final boolean failure = queryResult.isFailure();
+                if (failure) {
+                    assertThat(queryResult.toString(), failure, is(false));
+                }
+                assertThat(queryResult.isSuccess(), is(true));
+
+                // TODO: position not implemented
+                assertThat(queryResult.getPosition(), is(nullValue()));
+
+                assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
+                assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage);
+
+                assertThat(queryResult.getResult(), is(true));
+
+                assertThat(queryResult.getExecutionInfo(), is(empty()));
+            });
+    }
+
+    public void shouldCollectExecutionInfo() {
+
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME).withQuery(query).enableExecutionInfo();
+        final Set<Integer> partitions = mkSet(0, 1);
+
+        final StateQueryResult<Boolean> result =
+            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+
+        makeAssertions(
+            partitions,
+            result,
+            queryResult -> assertThat(queryResult.getExecutionInfo(), not(empty()))
+        );
+    }
+
+    public void shouldCollectExecutionInfoUnderFailure() {
+
+        final UnknownQuery query = new UnknownQuery();
+        final StateQueryRequest<Void> request =
+            inStore(STORE_NAME).withQuery(query).enableExecutionInfo();
+        final Set<Integer> partitions = mkSet(0, 1);
+
+        final StateQueryResult<Void> result =
+            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+
+        makeAssertions(
+            partitions,
+            result,
+            queryResult -> assertThat(queryResult.getExecutionInfo(), not(empty()))
+        );
+    }
+
+    private <R> void makeAssertions(
+        final Set<Integer> partitions,
+        final StateQueryResult<R> result,
+        final Consumer<QueryResult<R>> assertion) {
+
+        if (result.getGlobalResult() != null) {
+            assertion.accept(result.getGlobalResult());
+        } else {
+            assertThat(result.getPartitionResults().keySet(), is(partitions));
+            for (final Integer partition : partitions) {
+                assertion.accept(result.getPartitionResults().get(partition));
+            }
+        }
+    }
+
+    private static Properties streamsConfiguration(final boolean cache, final boolean log,
+        final String supplier) {
+        final String safeTestName =
+            IQv2StoreIntegrationTest.class.getName() + "-" + cache + "-" + log + "-" + supplier;

Review comment:
       Note: to prevent tests from polluting each other, it's important for each test to have a unique application name and state directory.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] patrickstuedi commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r760218888



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Position.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+
+/**
+ * A representation of a position vector with respect to a set of topic partitions. For example, in
+ * Interactive Query ({@link org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a
+ * query result may contain information from multiple store partitions, each of which contains
+ * information from multiple input topics' partitions. This class can be used to summarize all of
+ * that positional information.
+ * <p>
+ * This class is threadsafe, although it is mutable. Readers are recommended to use {@link
+ * Position#copy()} to avoid seeing mutations to the Position after they get the reference. For
+ * examples, when a store executes a {@link org.apache.kafka.streams.processor.StateStore#query(Query,
+ * PositionBound, boolean)} request and returns its current position via {@link
+ * QueryResult#setPosition(Position)}, it should pass a copy of its position instead of the mutable
+ * reference.
+ */
+@Evolving
+public class Position {
+
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position;
+
+    private Position(final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position) {
+        this.position = position;
+    }
+
+    /**
+     * Create a new, empty Position.
+     */
+    public static Position emptyPosition() {
+        return new Position(new ConcurrentHashMap<>());
+    }
+
+    /**
+     * Create a new Position and populate it with a mapping of topic -> partition -> offset.
+     * <p>
+     * Note, the resulting Position does not share any structure with the provided map, so
+     * subsequent changes to the map or Position will not affect the other.
+     */
+    public static Position fromMap(final Map<String, ? extends Map<Integer, Long>> map) {
+        return new Position(deepCopy(map));
+    }
+
+    /**
+     * Augment an existing Position by setting a new offset for a topic and partition.
+     * <p>
+     * Note: enforces monotonicity on offsets. I.e., if there is already a component for the same
+     * topic and partition with a larger offset, the update will succeed but not overwrite the
+     * offset.
+     * <p>
+     * Returns a self-reference for chained calls. Note: this method mutates the Position.
+     */
+    public Position withComponent(final String topic, final int partition, final long offset) {
+        position
+            .computeIfAbsent(topic, k -> new ConcurrentHashMap<>())
+            .compute(
+                partition,
+                (integer, prior) -> prior == null || offset > prior ? offset : prior

Review comment:
       Nice, this avoids the use of AtomicLong I guess! 
   
   Or, what about this case, is this safe?
   Long offset = position.get("topic").get(partition);
   if (offset > X){ //let's say there was some concurrent withComponent call on the same topic/partition here
   ..do something
   }

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Position.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+
+/**
+ * A representation of a position vector with respect to a set of topic partitions. For example, in
+ * Interactive Query ({@link org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a
+ * query result may contain information from multiple store partitions, each of which contains
+ * information from multiple input topics' partitions. This class can be used to summarize all of
+ * that positional information.
+ * <p>
+ * This class is threadsafe, although it is mutable. Readers are recommended to use {@link
+ * Position#copy()} to avoid seeing mutations to the Position after they get the reference. For
+ * examples, when a store executes a {@link org.apache.kafka.streams.processor.StateStore#query(Query,
+ * PositionBound, boolean)} request and returns its current position via {@link
+ * QueryResult#setPosition(Position)}, it should pass a copy of its position instead of the mutable
+ * reference.
+ */
+@Evolving
+public class Position {
+
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position;
+
+    private Position(final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position) {
+        this.position = position;
+    }
+
+    /**
+     * Create a new, empty Position.
+     */
+    public static Position emptyPosition() {
+        return new Position(new ConcurrentHashMap<>());
+    }
+
+    /**
+     * Create a new Position and populate it with a mapping of topic -> partition -> offset.
+     * <p>
+     * Note, the resulting Position does not share any structure with the provided map, so
+     * subsequent changes to the map or Position will not affect the other.
+     */
+    public static Position fromMap(final Map<String, ? extends Map<Integer, Long>> map) {
+        return new Position(deepCopy(map));
+    }
+
+    /**
+     * Augment an existing Position by setting a new offset for a topic and partition.
+     * <p>
+     * Note: enforces monotonicity on offsets. I.e., if there is already a component for the same
+     * topic and partition with a larger offset, the update will succeed but not overwrite the
+     * offset.
+     * <p>
+     * Returns a self-reference for chained calls. Note: this method mutates the Position.
+     */
+    public Position withComponent(final String topic, final int partition, final long offset) {
+        position
+            .computeIfAbsent(topic, k -> new ConcurrentHashMap<>())
+            .compute(
+                partition,
+                (integer, prior) -> prior == null || offset > prior ? offset : prior
+            );
+        return this;
+    }
+
+    /**
+     * Create a deep copy of the Position.
+     */
+    public Position copy() {
+        return new Position(deepCopy(position));

Review comment:
       Why not implement copy using merge?
   Position pos = Position.empyPosition();
   pos.merge(this);




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762147628



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Position.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A representation of a position vector with respect to a set of topic partitions. For example, in
+ * Interactive Query ({@link org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a
+ * query result may contain information from multiple store partitions, each of which contains
+ * information from multiple input topics' partitions. This class can be used to summarize all of
+ * that positional information.
+ * <p>
+ * This class is threadsafe, although it is mutable. Readers are recommended to use {@link
+ * Position#copy()} to avoid seeing mutations to the Position after they get the reference. For
+ * examples, when a store executes a {@link org.apache.kafka.streams.processor.StateStore#query(Query,
+ * PositionBound, boolean)} request and returns its current position via {@link
+ * QueryResult#setPosition(Position)}, it should pass a copy of its position instead of the mutable
+ * reference.
+ */
+@Evolving
+public class Position {

Review comment:
       Oops! I'm sorry about this and the other gaps you pointed out. I did make a couple of passes to compare with the KIP, but I obviously missed some things.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r760395779



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,100 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.forFailure(

Review comment:
       Thanks for the review!
   
   We can just add the failure right away because tasks are uniquely assigned to Streams instances, so it's not possible to find a single task twice in this loop.
   
   That's a good point about exiting as soon at the query is complete. I've updated the PR.
   
   Also, that's a good point about the failure reason. I previously had a specific PositionBound to "requireLatest" (aka "require active"), but I moved it into the StateQueryRequest, since enforcing the task state is the responsibility of the framework, not a store. However, I never changed this FailureReason. I've fixed it with a new reason in the PR. Good catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762162581



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class QueryResult<R> {

Review comment:
       I actually decided not to for the classes you pointed out because they are all trivial "struct"-type classes with no logic. I did, however, verify that all the new code has 100% (or as close as possible) code coverage via the existing set of tests in this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762169023



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Objects;
+
+/**
+ * A class bounding the processing state {@link Position} during queries. This can be used to
+ * specify that a query should fail if the locally available partition isn't caught up to the
+ * specified bound. "Unbounded" places no restrictions on the current location of the partition.
+ */
+@Evolving
+public class PositionBound {

Review comment:
       Sure thing!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r760406552



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Position.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+
+/**
+ * A representation of a position vector with respect to a set of topic partitions. For example, in
+ * Interactive Query ({@link org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a
+ * query result may contain information from multiple store partitions, each of which contains
+ * information from multiple input topics' partitions. This class can be used to summarize all of
+ * that positional information.
+ * <p>
+ * This class is threadsafe, although it is mutable. Readers are recommended to use {@link
+ * Position#copy()} to avoid seeing mutations to the Position after they get the reference. For
+ * examples, when a store executes a {@link org.apache.kafka.streams.processor.StateStore#query(Query,
+ * PositionBound, boolean)} request and returns its current position via {@link
+ * QueryResult#setPosition(Position)}, it should pass a copy of its position instead of the mutable
+ * reference.
+ */
+@Evolving
+public class Position {
+
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position;
+
+    private Position(final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position) {
+        this.position = position;
+    }
+
+    /**
+     * Create a new, empty Position.
+     */
+    public static Position emptyPosition() {
+        return new Position(new ConcurrentHashMap<>());
+    }
+
+    /**
+     * Create a new Position and populate it with a mapping of topic -> partition -> offset.
+     * <p>
+     * Note, the resulting Position does not share any structure with the provided map, so
+     * subsequent changes to the map or Position will not affect the other.
+     */
+    public static Position fromMap(final Map<String, ? extends Map<Integer, Long>> map) {
+        return new Position(deepCopy(map));
+    }
+
+    /**
+     * Augment an existing Position by setting a new offset for a topic and partition.
+     * <p>
+     * Note: enforces monotonicity on offsets. I.e., if there is already a component for the same
+     * topic and partition with a larger offset, the update will succeed but not overwrite the
+     * offset.
+     * <p>
+     * Returns a self-reference for chained calls. Note: this method mutates the Position.
+     */
+    public Position withComponent(final String topic, final int partition, final long offset) {
+        position
+            .computeIfAbsent(topic, k -> new ConcurrentHashMap<>())
+            .compute(
+                partition,
+                (integer, prior) -> prior == null || offset > prior ? offset : prior
+            );
+        return this;
+    }
+
+    /**
+     * Create a deep copy of the Position.
+     */
+    public Position copy() {
+        return new Position(deepCopy(position));

Review comment:
       Thanks for the review!
   
   That would also work, but this seems more direct.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r760231430



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,100 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.forFailure(

Review comment:
       Why do we add a failure here? The for loop will continue iterating and it might find an active task that hosts that store partition later, right? Don't you want to add the failure at the end of the loop if no such active partition is found? 
   Moreover, don't you want to exit the loop after the failure?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762037170



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The response object for interactive queries. This wraps the individual partition results, as well
+ * as metadata relating to the result as a whole.
+ * <p>
+ *
+ * @param <R> The type of the query result.
+ */
+@Evolving
+public class StateQueryResult<R> {
+
+    private final Map<Integer, QueryResult<R>> partitionResults = new HashMap<>();
+    private QueryResult<R> globalResult = null;
+
+    /**
+     * Set the result for a global store query. Used by Kafka Streams and available for tests.
+     */
+    public void setGlobalResult(final QueryResult<R> r) {

Review comment:
       Gah! I'm sorry for that miss. I had it in the POC from the beginning, but overlooked it in the KIP.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762148443



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Position.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A representation of a position vector with respect to a set of topic partitions. For example, in
+ * Interactive Query ({@link org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}, a
+ * query result may contain information from multiple store partitions, each of which contains
+ * information from multiple input topics' partitions. This class can be used to summarize all of
+ * that positional information.
+ * <p>
+ * This class is threadsafe, although it is mutable. Readers are recommended to use {@link
+ * Position#copy()} to avoid seeing mutations to the Position after they get the reference. For
+ * examples, when a store executes a {@link org.apache.kafka.streams.processor.StateStore#query(Query,
+ * PositionBound, boolean)} request and returns its current position via {@link
+ * QueryResult#setPosition(Position)}, it should pass a copy of its position instead of the mutable
+ * reference.
+ */
+@Evolving
+public class Position {
+
+    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position;
+
+    private Position(final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> position) {
+        this.position = position;
+    }
+
+    /**
+     * Create a new, empty Position.
+     */
+    public static Position emptyPosition() {
+        return new Position(new ConcurrentHashMap<>());
+    }
+
+    /**
+     * Create a new Position and populate it with a mapping of topic -> partition -> offset.
+     * <p>
+     * Note, the resulting Position does not share any structure with the provided map, so
+     * subsequent changes to the map or Position will not affect the other.
+     */
+    public static Position fromMap(final Map<String, ? extends Map<Integer, Long>> map) {
+        return new Position(deepCopy(map));
+    }
+
+    /**
+     * Augment an existing Position by setting a new offset for a topic and partition.
+     * <p>
+     * Note: enforces monotonicity on offsets. I.e., if there is already a component for the same
+     * topic and partition with a larger offset, the update will succeed but not overwrite the
+     * offset.
+     * <p>
+     * Returns a self-reference for chained calls. Note: this method mutates the Position.
+     */
+    public Position withComponent(final String topic, final int partition, final long offset) {
+        position
+            .computeIfAbsent(topic, k -> new ConcurrentHashMap<>())
+            .compute(
+                partition,
+                (integer, prior) -> prior == null || offset > prior ? offset : prior
+            );
+        return this;
+    }
+
+    /**
+     * Create a deep copy of the Position.
+     */
+    public Position copy() {
+        return new Position(deepCopy(position));
+    }
+
+    /**
+     * Create a new, structurally independent Position that is the result of merging two other
+     * Positions.
+     * <p>
+     * If both Positions contain the same topic -> partition -> offset mapping, the resulting
+     * Position will contain a mapping with the larger of the two offsets.
+     */
+    public Position merge(final Position other) {
+        if (other == null) {
+            return this;

Review comment:
       Actually, I think it'd be better to go ahead and return a copy and not have a special case in the contract. Good catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762146251



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,110 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            final Set<Integer> handledPartitions = new HashSet<>();
+
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.forFailure(
+                                        FailureReason.NOT_ACTIVE,
+                                        "Query requires a running active task,"
+                                            + " but partition was in state "
+                                            + state + " and was "
+                                            + (active ? "active" : "not active") + "."
+                                    )
+                                );
+                            } else {
+                                final QueryResult<R> r = store.query(
+                                    request.getQuery(),
+                                    request.isRequireActive()
+                                        ? PositionBound.unbounded()
+                                        : request.getPositionBound(),
+                                    request.executionInfoEnabled()
+                                );
+                                result.addResult(partition, r);
+                            }
+                        }
+
+                        // optimization: if we have handled all the requested partitions,
+                        // we can return right away.
+                        handledPartitions.add(partition);
+                        if (!request.isAllPartitions()
+                            && handledPartitions.containsAll(request.getPartitions())) {
+                            return result;
+                        }
+                    }
+                }
+            }
+        }
+
+        return result;
+    }

Review comment:
       For sure! I was actually hoping that we would have a chance to make this logic more efficient, so I didn't bother making it too pretty, but you're totally right. There's no guarantee on when or if we'll refactor it, so we should make it nice now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762153447



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Objects;
+
+/**
+ * A class bounding the processing state {@link Position} during queries. This can be used to
+ * specify that a query should fail if the locally available partition isn't caught up to the
+ * specified bound. "Unbounded" places no restrictions on the current location of the partition.
+ */
+@Evolving
+public class PositionBound {
+
+    private final Position position;
+    private final boolean unbounded;
+
+    private PositionBound(final Position position, final boolean unbounded) {
+        if (unbounded && position != null) {
+            throw new IllegalArgumentException();
+        } else if (position != null) {
+            this.position = position.copy();
+            this.unbounded = false;
+        } else {
+            this.position = null;
+            this.unbounded = unbounded;
+        }
+    }
+
+    /**
+     * Creates a new PositionBound representing "no bound"
+     */
+    public static PositionBound unbounded() {
+        return new PositionBound(null, true);
+    }
+
+    /**
+     * Creates a new PositionBound representing a specific position.
+     */
+    public static PositionBound at(final Position position) {
+        return new PositionBound(position, false);
+    }
+
+    /**
+     * Returns true iff this object specifies that there is no position bound.
+     */
+    public boolean isUnbounded() {
+        return unbounded;
+    }
+
+    /**
+     * Returns the specific position of this bound.
+     *
+     * @throws IllegalArgumentException if this is an "unbounded" position.
+     */
+    public Position position() {
+        if (unbounded) {
+            throw new IllegalArgumentException(

Review comment:
       Yeah, I realize this is a bit creative. I was thinking of `this` as the argument to the method for this case.
   
   ISE seems to imply that we're in some kind of unexpected edge case, and the docs imply that it's probably timing related. Neither of those are appropriate here, since, if you create an unbounded PositionBound, it's simply illegal to ask for its position.
   
   It seems "the people" are also divided on this point (https://stackoverflow.com/questions/30086604/which-exception-to-throw-if-method-is-not-appropriate-for-current-object)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762154016



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/Query.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+/**
+ * Marker interface that all interactive queries must implement (see {@link
+ * org.apache.kafka.streams.KafkaStreams#query(StateQueryRequest)}).
+ * <p>
+ * You can find all available queries by searching for classes implementing this interface.
+ * <p>
+ * Kafka Streams will pass unknown query types straight through into the bytes stores, so callers
+ * can add custom queries by implementing this interface and providing custom stores that handle
+ * them (via {@link org.apache.kafka.streams.state.StoreSupplier}s.
+ * <p>
+ * See KIP-796 (https://cwiki.apache.org/confluence/x/34xnCw) for more details.
+ *
+ * @param <R> The type of the result returned by this query.
+ */
+public interface Query<R> {

Review comment:
       Yep!

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class QueryResult<R> {

Review comment:
       Yes. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762154915



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class QueryResult<R> {
+
+    private final List<String> executionInfo = new LinkedList<>();
+    private final FailureReason failureReason;
+    private final String failure;
+    private final R result;
+    private Position position;
+
+    private QueryResult(final R result) {
+        this.result = result;
+        this.failureReason = null;
+        this.failure = null;
+    }
+
+    private QueryResult(final FailureReason failureReason, final String failure) {
+        this.result = null;
+        this.failureReason = failureReason;
+        this.failure = failure;
+    }
+
+    /**
+     * Static factory method to create a result object for a successful query. Used by StateStores
+     * to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forResult(final R result) {
+        return new QueryResult<>(result);
+    }
+
+    /**
+     * Static factory method to create a result object for a failed query. Used by StateStores to
+     * respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forFailure(
+        final FailureReason failureReason,
+        final String failureMessage) {
+
+        return new QueryResult<>(failureReason, failureMessage);
+    }
+
+    /**
+     * Static factory method to create a failed query result object to indicate that the store does
+     * not know how to handle the query.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> forUnknownQueryType(
+        final Query<R> query,
+        final StateStore store) {
+
+        return new QueryResult<>(
+            FailureReason.UNKNOWN_QUERY_TYPE,
+            "This store (" + store.getClass() + ") doesn't know how to execute "
+                + "the given query (" + query + ")." +
+                " Contact the store maintainer if you need support for a new query type.");
+    }
+
+    /**
+     * Static factory method to create a failed query result object to indicate that the store has
+     * not yet caught up to the requested position bound.
+     * <p>
+     * Used by StateStores to respond to a {@link StateStore#query(Query, PositionBound, boolean)}.
+     */
+    public static <R> QueryResult<R> notUpToBound(
+        final Position currentPosition,
+        final PositionBound positionBound,
+        final int partition) {
+
+        return new QueryResult<>(
+            FailureReason.NOT_UP_TO_BOUND,
+            "For store partition " + partition + ", the current position "
+                + currentPosition + " is not yet up to the bound "
+                + positionBound
+        );
+    }
+
+    /**
+     * Used by stores to add detailed execution information (if requested) during query execution.
+     */
+    public void addExecutionInfo(final String message) {
+        executionInfo.add(message);
+    }
+
+    /**
+     * Used by stores to report what exact position in the store's history it was at when it
+     * executed the query.
+     */
+    public void setPosition(final Position position) {
+        this.position = position;
+    }
+
+    /**
+     * True iff the query was successfully executed. The response is available in {@link
+     * this#getResult()}.
+     */
+    public boolean isSuccess() {
+        return failureReason == null;
+    }
+
+
+    /**
+     * True iff the query execution failed. More information about the failure is available in
+     * {@link this#getFailureReason()} and {@link this#getFailureMessage()}.
+     */
+    public boolean isFailure() {
+        return failureReason != null;
+    }
+
+    /**
+     * If detailed execution information was requested in {@link StateQueryRequest#enableExecutionInfo()},
+     * this method returned the execution details for this partition's result.
+     */
+    public List<String> getExecutionInfo() {
+        return executionInfo;
+    }
+
+    /**
+     * This state partition's exact position in its history when this query was executed. Can be
+     * used in conjunction with subsequent queries via {@link StateQueryRequest#withPositionBound(PositionBound)}.
+     * <p>
+     * Note: stores are encouraged, but not required to set this property.
+     */
+    public Position getPosition() {
+        return position;
+    }
+
+    /**
+     * If this partition failed to execute the query, returns the reason.
+     *
+     * @throws IllegalArgumentException if this is not a failed result.
+     */
+    public FailureReason getFailureReason() {
+        if (!isFailure()) {
+            throw new IllegalArgumentException(
+                "Cannot get failure reason because this query did not fail."
+            );
+        }
+        return failureReason;
+    }
+
+    /**
+     * If this partition failed to execute the query, returns the failure message.
+     *
+     * @throws IllegalArgumentException if this is not a failed result.
+     */
+    public String getFailureMessage() {
+        if (!isFailure()) {
+            throw new IllegalArgumentException(
+                "Cannot get failure message because this query did not fail."
+            );
+        }
+        return failure;
+    }
+
+    /**
+     * Returns the result of executing the query on one partition. The result type is determined by
+     * the query. Note: queries may choose to return {@code null} for a successful query, so {@link
+     * this#isSuccess()} and {@link this#isFailure()} must be used to determine whether the query
+     * was successful of failed on this partition.
+     *
+     * @throws IllegalArgumentException if this is not a successful query.
+     */
+    public R getResult() {
+        if (!isSuccess()) {
+            throw new IllegalArgumentException(
+                "Cannot get result for failed query. Failure is " + failureReason.name() + ": "
+                    + failure);
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "QueryResult{" +
+            "executionInfo=" + executionInfo +
+            ", failureReason=" + failureReason +
+            ", failure='" + failure + '\'' +
+            ", result=" + result +
+            ", position=" + position +
+            '}';
+    }
+}

Review comment:
       Ah, I dropped it from the PR just because we don't need it for the queries in this PR. I was thinking to add it in a PR that actually implements some typed queries so it can be functionally tested.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r762370915



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+
+public final class StoreQueryUtils {
+
+    // make this class uninstantiable
+    private StoreQueryUtils() {
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <R> QueryResult<R> handleBasicQueries(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo,
+        final StateStore store) {
+
+        final QueryResult<R> result;
+        final long start = collectExecutionInfo ? System.nanoTime() : -1L;
+        // TODO: position tracking
+        if (query instanceof PingQuery) {
+            result = (QueryResult<R>) QueryResult.forResult(true);

Review comment:
       Unfortunately, it's not possible to guarantee that using Java's type system. The query caller will just have to be sure to submit correctly typed queries based on their a priori knowledge of the store's types. If they get it wrong, they'll get a ClassCastException at run time. This is a bummer, but it's also precisely the same in IQ today.
   
   As far as I know, it's not possible to do better than this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org