You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/01 14:24:00 UTC

[GitHub] [kafka] vpapavas commented on a change in pull request #11557: KAFKA-13491: IQv2 framework

vpapavas commented on a change in pull request #11557:
URL: https://github.com/apache/kafka/pull/11557#discussion_r760231430



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1716,4 +1725,100 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    /**
+     * Run an interactive query against a state store.
+     * <p>
+     * This method allows callers outside of the Streams runtime to access the internal state of
+     * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
+     * for more information.
+     *
+     * @param <R> The result type specified by the query.
+     * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link
+     *                                    KafkaStreams#start()} and then retry this call.
+     * @throws StreamsStoppedException    If Streams is in a terminal state like PENDING_SHUTDOWN,
+     *                                    NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
+     *                                    discover a new instance to query.
+     * @throws UnknownStateStoreException If the specified store name does not exist in the
+     *                                    topology.
+     */
+    @Evolving
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store "
+                    + storeName
+                    + " because no such store is registered in the topology."
+            );
+        }
+        if (state().hasNotStarted()) {
+            throw new StreamsNotStartedException(
+                "KafkaStreams has not been started, you can retry after calling start()."
+            );
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new StreamsStoppedException(
+                "KafkaStreams has been stopped (" + state + ")."
+                    + " This instance can no longer serve queries."
+            );
+        }
+        final StateQueryResult<R> result = new StateQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            final QueryResult<R> r =
+                store.query(
+                    request.getQuery(),
+                    request.getPositionBound(),
+                    request.executionInfoEnabled()
+                );
+            result.setGlobalResult(r);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+                    final TaskId taskId = entry.getKey();
+                    final int partition = taskId.partition();
+                    if (request.isAllPartitions()
+                        || request.getPartitions().contains(partition)) {
+                        final Task task = entry.getValue();
+                        final StateStore store = task.getStore(storeName);
+                        if (store != null) {
+                            final StreamThread.State state = thread.state();
+                            final boolean active = task.isActive();
+                            if (request.isRequireActive()
+                                && (state != StreamThread.State.RUNNING
+                                || !active)) {
+                                result.addResult(
+                                    partition,
+                                    QueryResult.forFailure(

Review comment:
       Why do we add a failure here? The for loop will continue iterating until it finds an active task that hosts that store partition, right? Don't you want to add the failure if no such active partition is found?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org