You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/01 17:15:10 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

vvcephei commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r760395779



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,100 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.forFailure(

Review comment:
       Thanks for the review!
   
   We can just add the failure right away because tasks are uniquely assigned to Streams instances, so it's not possible to find a single task twice in this loop.
   
   That's a good point about exiting as soon at the query is complete. I've updated the PR.
   
   Also, that's a good point about the failure reason. I previously had a specific PositionBound to "requireLatest" (aka "require active"), but I moved it into the StateQueryRequest, since enforcing the task state is the responsibility of the framework, not a store. However, I never changed this FailureReason. I've fixed it with a new reason in the PR. Good catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org