You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/22 13:37:33 UTC

[GitHub] [kafka] vpapavas commented on a change in pull request #11406: POC: Drafting improvements for Interactive Query

vpapavas commented on a change in pull request #11406:
URL: https://github.com/apache/kafka/pull/11406#discussion_r734548879



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1715,4 +1723,93 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    public <K, V> StateSerdes<K, V> serdesForStore(final String storeName) {
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such store is registered in the topology."
+            );
+        }
+
+        // TODO this is a hack. We ought to be able to create the serdes independent of the
+        // TODO stores and cache them in the topology.
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            return getSerdes(store);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+                    final StateStore store = entry.getValue().getStore(storeName);
+                    if (store != null) {
+                        return getSerdes(store);
+                    }
+                }
+            }
+        }
+        // there may be no local copy of this store.
+        // This is the main reason I want to decouble serde
+        // creation from the store itself.
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <V, K> StateSerdes<K, V> getSerdes(final StateStore store) {
+        if (store instanceof MeteredKeyValueStore) {
+            return ((MeteredKeyValueStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredSessionStore) {
+            return ((MeteredSessionStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredWindowStore) {
+            return ((MeteredWindowStore<K, V>) store).serdes();
+        } else {
+            throw new IllegalArgumentException("Unknown store type: " + store);
+        }
+    }
+
+    public <R> InteractiveQueryResult<R> query(final InteractiveQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such store is registered in the topology."
+            );
+        }
+        final InteractiveQueryResult<R> result = new InteractiveQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();

Review comment:
       What is the difference between the global state stores and the state stores associated with a task?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org