You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/18 13:30:57 UTC

[GitHub] [kafka] patrickstuedi commented on a change in pull request #11406: POC: Drafting improvements for Interactive Query

patrickstuedi commented on a change in pull request #11406:
URL: https://github.com/apache/kafka/pull/11406#discussion_r730851673



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -40,64 +71,41 @@
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.StreamsNotStartedException;
 import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
 import org.apache.kafka.streams.errors.UnknownStateStoreException;
-import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
 import org.apache.kafka.streams.internals.metrics.ClientMetrics;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ClientUtils;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
-import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.InteractiveQueryRequest;

Review comment:
       Did you check this in? Couldn't find it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.Query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
+import org.apache.kafka.streams.query.RawScanQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public final class StoreQueryUtils {
+
+    // make this class uninstantiable
+    private StoreQueryUtils() {
+    }
+
+    public static <R> QueryResult<R> requireKVQuery(
+        final Query<R> query,
+        final KeyValueStore<Bytes, byte[]> kvStore) {
+        final QueryResult<R> r = StoreQueryUtils.handleKVQuery(query, kvStore);
+        r.throwIfFailure();
+        return r;
+    }
+
+    public static <R> QueryResult<R> handleKVQuery(
+        final Query<R> query,
+        final KeyValueStore<Bytes, byte[]> kvStore) {
+
+        final long start = System.nanoTime();
+        if (query instanceof RawKeyQuery) {

Review comment:
       Would it make sense to add a query type to Query, instead of doing instance checks? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.Query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
+import org.apache.kafka.streams.query.RawScanQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public final class StoreQueryUtils {
+
+    // make this class uninstantiable
+    private StoreQueryUtils() {
+    }
+
+    public static <R> QueryResult<R> requireKVQuery(
+        final Query<R> query,
+        final KeyValueStore<Bytes, byte[]> kvStore) {
+        final QueryResult<R> r = StoreQueryUtils.handleKVQuery(query, kvStore);
+        r.throwIfFailure();
+        return r;
+    }
+
+    public static <R> QueryResult<R> handleKVQuery(
+        final Query<R> query,
+        final KeyValueStore<Bytes, byte[]> kvStore) {
+
+        final long start = System.nanoTime();
+        if (query instanceof RawKeyQuery) {
+            final RawKeyQuery keyQuery = (RawKeyQuery) query;
+            final Bytes key = keyQuery.getKey();
+            final byte[] value = kvStore.get(key);
+            final R result = query.constructResult(value);
+            final long end = System.nanoTime();
+
+            final QueryResult<R> queryResult = QueryResult.forResult(result);
+            queryResult.addExecutionInfo("Handled on " + kvStore.getClass().getName()
+                + "#get via StoreQueryAdapters" + " in " + (end - start) + "ns");
+            return queryResult;
+        } else if (query instanceof RawScanQuery) {
+            final KeyValueIterator<Bytes, byte[]> iterator = kvStore.all();
+            final R result = query.constructResult(iterator);
+            final long end = System.nanoTime();
+            final QueryResult<R> queryResult = QueryResult.forResult(result);
+            queryResult.addExecutionInfo("Handled on " + kvStore.getClass().getName()
+                + "#all via StoreQueryAdapters" + " in " + (end - start) + "ns");
+            return queryResult;
+        } else {
+            return QueryResult.forUnknownQueryType(query, kvStore);
+        }
+    }
+
+    public static boolean dominates(

Review comment:
       If you decide to create a type for the version vectors, you could add that method there.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1715,4 +1723,93 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    public <K, V> StateSerdes<K, V> serdesForStore(final String storeName) {
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such store is registered in the topology."
+            );
+        }
+
+        // TODO this is a hack. We ought to be able to create the serdes independent of the
+        // TODO stores and cache them in the topology.
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            return getSerdes(store);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+                    final StateStore store = entry.getValue().getStore(storeName);
+                    if (store != null) {
+                        return getSerdes(store);
+                    }
+                }
+            }
+        }
+        // there may be no local copy of this store.
+        // This is the main reason I want to decouble serde
+        // creation from the store itself.
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <V, K> StateSerdes<K, V> getSerdes(final StateStore store) {
+        if (store instanceof MeteredKeyValueStore) {
+            return ((MeteredKeyValueStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredSessionStore) {
+            return ((MeteredSessionStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredWindowStore) {
+            return ((MeteredWindowStore<K, V>) store).serdes();
+        } else {
+            throw new IllegalArgumentException("Unknown store type: " + store);
+        }
+    }
+
+    public <R> InteractiveQueryResult<R> query(final InteractiveQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such store is registered in the topology."
+            );
+        }
+        final InteractiveQueryResult<R> result = new InteractiveQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            // TODO: a global state store might need to be a different interface to accommodate
+            // TODO: multiple partitions
+            final QueryResult<R> r = store.query(request.getQuery(), Collections.emptyMap());
+            result.setGlobalResult(r);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+                    final Map<String, Map<Integer, Long>> partitionsAndOffsetBounds =

Review comment:
       Can this be pulled out of the loop(s)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -119,4 +122,8 @@ default void init(final StateStoreContext context, final StateStore root) {
      * @return {@code true} if the store is open
      */
     boolean isOpen();
+
+    default <R> QueryResult<R> query(Query<R> query, Map<String, Map<Integer, Long>> offsetBound) {

Review comment:
       It might be nice to create a sep type/class (e.g., OffsetVector ?) for what you have as Map<String, Map<Int, Long>>

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -72,6 +74,11 @@ public boolean isOpen() {
         return open;
     }
 
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query, final Map<String, Map<Integer, Long>> offsetBound) {
+        return StoreQueryUtils.requireKVQuery(query, this);

Review comment:
       No offsetbound checks?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -105,6 +108,8 @@
     private final RocksDBMetricsRecorder metricsRecorder;
 
     protected volatile boolean open = false;
+    private StateStoreContext context;
+    private Map<String, Map<Integer, Long>> seenOffsets = new HashMap<>();

Review comment:
       Another incentive to create a type for this :-)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org