You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/11/24 05:10:39 UTC
[kafka] 01/03: POC of the KIP-796 IQv2 proposal
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch iqv2-framework
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 3e308374fc1289c69e28acc0a8a96fa35c3d56fd
Author: John Roesler <vv...@apache.org>
AuthorDate: Thu Oct 14 12:53:11 2021 -0500
POC of the KIP-796 IQv2 proposal
fix test
Prototype of using KeyQuery against the Windowed store
updates
---
.../org/apache/kafka/streams/KafkaStreams.java | 125 +++++-
.../apache/kafka/streams/processor/StateStore.java | 25 ++
.../kafka/streams/processor/StateStoreContext.java | 4 +-
.../apache/kafka/streams/query/FailureReason.java | 44 ++
.../streams/query/InteractiveQueryRequest.java | 157 +++++++
.../streams/query/InteractiveQueryResult.java | 72 ++++
.../org/apache/kafka/streams/query/Iterators.java | 82 ++++
.../KeyQuery.java} | 26 +-
.../org/apache/kafka/streams/query/Position.java | 192 +++++++++
.../apache/kafka/streams/query/PositionBound.java | 80 ++++
.../Query.java} | 16 +-
.../apache/kafka/streams/query/QueryResult.java | 150 +++++++
.../RawKeyQuery.java} | 32 +-
.../RawScanQuery.java} | 21 +-
.../apache/kafka/streams/state/StateSerdes.java | 9 +
.../AbstractRocksDBSegmentedBytesStore.java | 18 +
.../state/internals/InMemoryKeyValueStore.java | 12 +
.../InMemoryTimeOrderedKeyValueBuffer.java | 12 +
.../streams/state/internals/MemoryLRUCache.java | 12 +
.../state/internals/MeteredKeyValueStore.java | 41 ++
.../state/internals/MeteredSessionStore.java | 4 +
.../state/internals/MeteredWindowStore.java | 117 ++++--
.../state/internals/QueryableStoreProvider.java | 5 +
.../streams/state/internals/RocksDBStore.java | 42 ++
.../streams/state/internals/StoreQueryUtils.java | 122 ++++++
.../internals/TimestampedKeyValueStoreBuilder.java | 15 +
.../internals/TimestampedWindowStoreBuilder.java | 15 +
.../state/internals/ValueAndTimestampSerde.java | 10 +
.../WindowToTimestampedWindowByteStoreAdapter.java | 12 +
.../streams/state/internals/WrappedStateStore.java | 18 +
.../streams/integration/IQv2IntegrationTest.java | 460 +++++++++++++++++++++
.../streams/state/internals/RocksDBStoreTest.java | 3 +
32 files changed, 1867 insertions(+), 86 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bb0c40a..2356096 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
@@ -40,29 +41,38 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
-import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
-import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.InteractiveQueryRequest;
+import org.apache.kafka.streams.query.InteractiveQueryResult;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
+import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
+import org.apache.kafka.streams.state.internals.MeteredSessionStore;
+import org.apache.kafka.streams.state.internals.MeteredWindowStore;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.slf4j.Logger;
@@ -78,17 +88,18 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -1716,4 +1727,112 @@ public class KafkaStreams implements AutoCloseable {
return Collections.unmodifiableMap(localStorePartitionLags);
}
+
+ public <K, V> StateSerdes<K, V> serdesForStore(final String storeName) {
+ if (!topologyMetadata.hasStore(storeName)) {
+ throw new UnknownStateStoreException(
+ "Cannot get state store " + storeName + " because no such store is registered in the topology."
+ );
+ }
+
+ // TODO this is a hack. We ought to be able to create the serdes independent of the
+ // TODO stores and cache them in the topology.
+ final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+ if (globalStateStores.containsKey(storeName)) {
+ final StateStore store = globalStateStores.get(storeName);
+ return getSerdes(store);
+ } else {
+ for (final StreamThread thread : threads) {
+ final Map<TaskId, Task> tasks = thread.allTasks();
+ for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+ final StateStore store = entry.getValue().getStore(storeName);
+ if (store != null) {
+ return getSerdes(store);
+ }
+ }
+ }
+ }
+ // there may be no local copy of this store.
+ // This is the main reason I want to decouble serde
+ // creation from the store itself.
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <V, K> StateSerdes<K, V> getSerdes(final StateStore store) {
+ if (store instanceof MeteredKeyValueStore) {
+ return ((MeteredKeyValueStore<K, V>) store).serdes();
+ } else if (store instanceof MeteredSessionStore) {
+ return ((MeteredSessionStore<K, V>) store).serdes();
+ } else if (store instanceof MeteredWindowStore) {
+ return ((MeteredWindowStore<K, V>) store).serdes();
+ } else {
+ throw new IllegalArgumentException("Unknown store type: " + store);
+ }
+ }
+
+ @Evolving
+ public <R> InteractiveQueryResult<R> query(final InteractiveQueryRequest<R> request) {
+ final String storeName = request.getStoreName();
+ if (!topologyMetadata.hasStore(storeName)) {
+ throw new UnknownStateStoreException(
+ "Cannot get state store " + storeName + " because no such store is registered in the topology."
+ );
+ }
+ final InteractiveQueryResult<R> result = new InteractiveQueryResult<>(new HashMap<>());
+
+ final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
+ if (globalStateStores.containsKey(storeName)) {
+ final StateStore store = globalStateStores.get(storeName);
+ final QueryResult<R> r =
+ store.query(
+ request.getQuery(),
+ request.getPositionBound(),
+ request.executionInfoEnabled()
+ );
+ result.setGlobalResult(r);
+ } else {
+ for (final StreamThread thread : threads) {
+ final Map<TaskId, Task> tasks = thread.allTasks();
+ for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+
+ final TaskId taskId = entry.getKey();
+ final int partition = taskId.partition();
+ if (request.isAllPartitions()
+ || request.getPartitions().contains(partition)) {
+ final Task task = entry.getValue();
+ final StateStore store = task.getStore(storeName);
+ if (store != null) {
+ final StreamThread.State state = thread.state();
+ final boolean active = task.isActive();
+ if (request.isRequireActive()
+ && (state != StreamThread.State.RUNNING
+ || !active)) {
+ result.addResult(
+ partition,
+ QueryResult.notActive(
+ state,
+ active,
+ partition
+ )
+ );
+ } else {
+ final QueryResult<R> r = store.query(
+ request.getQuery(),
+ request.isRequireActive()
+ ? PositionBound.unbounded()
+ : request.getPositionBound(),
+ request.executionInfoEnabled()
+ );
+ result.addResult(partition, r);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return result;
+ }
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 76d1ab4..b1c480f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -19,6 +19,10 @@ package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
/**
* A storage engine for managing state maintained by a stream processor.
@@ -119,4 +123,25 @@ public interface StateStore {
* @return {@code true} if the store is open
*/
boolean isOpen();
+
+ /**
+ * Execute a query. Returns a QueryResult containing either result data or
+ * a failure.
+ * <p>
+ * If the store doesn't know how to handle the given query, the result
+ * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
+ * If the store couldn't satisfy the given position bound, the result
+ * will be a {@link FailureReason#NOT_UP_TO_BOUND}.
+ * @param query The query to execute
+ * @param positionBound The position the store must be at or past
+ * @param collectExecutionInfo Whether the store should collect detailed execution info for the query
+ * @param <R> The result type
+ */
+ default <R> QueryResult<R> query(
+ Query<R> query,
+ PositionBound positionBound,
+ boolean collectExecutionInfo) {
+
+ return QueryResult.forUnknownQueryType(query, this);
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
index 50d5879..f6f1446 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
@@ -46,8 +46,8 @@ public interface StateStoreContext {
/**
* Return the metadata of the current topic/partition/offset if available.
- * This is defined as the metadata of the record that is currently been
- * processed by the StreamTask that holds the store.
+ * This is defined as the metadata of the record that is currently being
+ * processed (or was last processed) by the StreamTask that holds the store.
* <p>
* Note that the metadata is not defined during all store interactions, for
* example, while the StreamTask is running a punctuation.
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
new file mode 100644
index 0000000..02db7fe
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+public enum FailureReason {
+ /**
+ * Failure indicating that the store doesn't know how to handle the given query.
+ */
+ UNKNOWN_QUERY_TYPE,
+
+ /**
+ * Failure indicating that the store partition is not (yet) up to the desired bound.
+ * The caller should either try again later or try a different replica.
+ */
+ NOT_UP_TO_BOUND,
+
+ /**
+ * Failure indicating that the requested store partition is not present on the local
+ * KafkaStreams instance. It may have been migrated to another instance during a rebalance.
+ * The caller is recommended to try a different replica.
+ */
+ NOT_PRESENT,
+
+ /**
+ * The requested store partition does not exist at all. For example, partition 4 was requested,
+ * but the store in question only has 4 partitions (0 through 3).
+ */
+ DOES_NOT_EXIST;
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java
new file mode 100644
index 0000000..56fc0a8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * @param <R>
+ */
+public class InteractiveQueryRequest<R> {
+
+ private final String storeName;
+ private final PositionBound position;
+ private final Optional<Set<Integer>> partitions;
+ private final Query<R> query;
+ private boolean executionInfoEnabled;
+ private boolean requireActive;
+
+ private InteractiveQueryRequest(
+ final String storeName,
+ final PositionBound position,
+ final Optional<Set<Integer>> partitions,
+ final Query<R> query,
+ final boolean executionInfoEnabled, final boolean requireActive) {
+
+ this.storeName = storeName;
+ this.position = position;
+ this.partitions = partitions;
+ this.query = query;
+ this.executionInfoEnabled = executionInfoEnabled;
+ this.requireActive = requireActive;
+ }
+
+ public static InStore inStore(final String name) {
+ return new InStore(name);
+ }
+
+ public InteractiveQueryRequest<R> withPositionBound(final PositionBound positionBound) {
+ return new InteractiveQueryRequest<>(
+ storeName,
+ positionBound,
+ partitions,
+ query,
+ executionInfoEnabled,
+ false);
+ }
+
+
+ public InteractiveQueryRequest<R> withNoPartitions() {
+ return new InteractiveQueryRequest<>(storeName,
+ position,
+ Optional.of(Collections.emptySet()),
+ query,
+ executionInfoEnabled,
+ requireActive);
+ }
+
+ public InteractiveQueryRequest<R> withAllPartitions() {
+ return new InteractiveQueryRequest<>(storeName,
+ position,
+ Optional.empty(),
+ query,
+ executionInfoEnabled,
+ requireActive);
+ }
+
+ public InteractiveQueryRequest<R> withPartitions(final Set<Integer> partitions) {
+ return new InteractiveQueryRequest<>(storeName,
+ position,
+ Optional.of(Collections.unmodifiableSet(new HashSet<>(partitions))),
+ query,
+ executionInfoEnabled,
+ requireActive);
+ }
+
+ public String getStoreName() {
+ return storeName;
+ }
+
+ public PositionBound getPositionBound() {
+ if (requireActive) {
+ throw new IllegalArgumentException();
+ }
+ return Objects.requireNonNull(position);
+ }
+
+ public Query<R> getQuery() {
+ return query;
+ }
+
+ public boolean isAllPartitions() {
+ return !partitions.isPresent();
+ }
+
+ public Set<Integer> getPartitions() {
+ if (!partitions.isPresent()) {
+ throw new UnsupportedOperationException(
+ "Cannot list partitions of an 'all partitions' request");
+ } else {
+ return partitions.get();
+ }
+ }
+
+ public InteractiveQueryRequest<R> enableExecutionInfo() {
+ return new InteractiveQueryRequest<>(storeName,
+ position,
+ partitions,
+ query,
+ true,
+ requireActive);
+ }
+
+ public boolean executionInfoEnabled() {
+ return executionInfoEnabled;
+ }
+
+ public boolean isRequireActive() {
+ return requireActive;
+ }
+
+ public static class InStore {
+
+ private final String name;
+
+ private InStore(final String name) {
+ this.name = name;
+ }
+
+ public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query) {
+ return new InteractiveQueryRequest<>(
+ name,
+ null,
+ Optional.empty(),
+ query,
+ false,
+ true);
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java
new file mode 100644
index 0000000..4f8e728
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class InteractiveQueryResult<R> {
+
+ private final Map<Integer, QueryResult<R>> partitionResults;
+
+ public InteractiveQueryResult(final Map<Integer, QueryResult<R>> resultMap) {
+ partitionResults = resultMap;
+ }
+
+ public void setGlobalResult(final QueryResult<R> r) {
+
+ }
+
+ public void addResult(final int partition, final QueryResult<R> r) {
+ partitionResults.put(partition, r);
+ }
+
+ public Map<Integer, QueryResult<R>> getPartitionResults() {
+ return partitionResults;
+ }
+
+ public QueryResult<R> getOnlyPartitionResult() {
+ final List<QueryResult<R>> nonempty =
+ partitionResults
+ .values()
+ .stream()
+ .filter(r -> r.getResult() != null)
+ .collect(Collectors.toList());
+
+ if (nonempty.size() != 1) {
+ throw new IllegalStateException();
+ } else {
+ return nonempty.get(0);
+ }
+ }
+
+ public Position getPosition() {
+ Position position = Position.emptyPosition();
+ for (final QueryResult<R> r : partitionResults.values()) {
+ position = position.merge(r.getPosition());
+ }
+ return position;
+ }
+
+ @Override
+ public String toString() {
+ return "InteractiveQueryResult{" +
+ "partitionResults=" + partitionResults +
+ '}';
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Iterators.java b/streams/src/main/java/org/apache/kafka/streams/query/Iterators.java
new file mode 100644
index 0000000..d7ea7e9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Iterators.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.utils.CloseableIterator;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Objects;
+
+public final class Iterators {
+
+ private Iterators() {
+ }
+
+ public static <E, I extends Iterator<E>> CloseableIterator<E> collate(final Collection<I> iterators) {
+ return new CloseableIterator<E>() {
+ private final Deque<I> iteratorQueue = new LinkedList<>(iterators);
+
+ @Override
+ public void close() {
+ RuntimeException exception = null;
+ for (final I iterator : iterators) {
+ if (iterator instanceof Closeable) {
+ try {
+ ((Closeable) iterator).close();
+ } catch (final IOException e) {
+ if (exception == null) {
+ exception = new RuntimeException(
+ "Exception closing collated iterator", e);
+ } else {
+ exception.addSuppressed(e);
+ }
+ }
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ for (int i = 0; i < iterators.size(); i++) {
+ final Iterator<E> iterator = Objects.requireNonNull(iteratorQueue.peek());
+ if (iterator.hasNext()) {
+ return true;
+ } else {
+ iteratorQueue.push(iteratorQueue.poll());
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public E next() {
+ final I iterator = iteratorQueue.poll();
+ final E next = Objects.requireNonNull(iterator).next();
+ iteratorQueue.push(iterator);
+ return next;
+ }
+ };
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
similarity index 56%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
copy to streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
index 1936d29..3900739 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
@@ -14,19 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.streams.query;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+public class KeyQuery<K, V> implements Query<V> {
-import static java.util.Objects.requireNonNull;
+ private final K key;
-public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
- public ValueAndTimestampSerde(final Serde<V> valueSerde) {
- super(
- new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
- new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
- );
+ private KeyQuery(final K key) {
+ this.key = key;
}
-}
\ No newline at end of file
+
+ public static <K, V> KeyQuery<K, V> withKey(final K key) {
+ return new KeyQuery<>(key);
+ }
+
+ public K getKey() {
+ return key;
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
new file mode 100644
index 0000000..5b0e981
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+
+public class Position {
+
+ private final Map<String, Map<Integer, Long>> position;
+
+ private Position(final Map<String, Map<Integer, Long>> position) {
+ this.position = position;
+ }
+
+ public static Position emptyPosition() {
+ return new Position(new HashMap<>());
+ }
+
+ public static Position fromMap(final Map<String, Map<Integer, Long>> map) {
+ return new Position(deepCopy(map));
+ }
+
+ public Position withComponent(final String topic, final int partition, final long offset) {
+ final Map<String, Map<Integer, Long>> updated = deepCopy(position);
+ updated.computeIfAbsent(topic, k -> new HashMap<>()).put(partition, offset);
+ return new Position(updated);
+ }
+
+ public Position merge(final Position other) {
+ if (other == null) {
+ return this;
+ } else {
+ final Map<String, Map<Integer, Long>> copy = deepCopy(position);
+ for (final Entry<String, Map<Integer, Long>> entry : other.position.entrySet()) {
+ final String topic = entry.getKey();
+ final Map<Integer, Long> partitionMap =
+ copy.computeIfAbsent(topic, k -> new HashMap<>());
+ for (final Entry<Integer, Long> partitionOffset : entry.getValue().entrySet()) {
+ final Integer partition = partitionOffset.getKey();
+ final Long offset = partitionOffset.getValue();
+ if (!partitionMap.containsKey(partition)
+ || partitionMap.get(partition) < offset) {
+ partitionMap.put(partition, offset);
+ }
+ }
+ }
+ return new Position(copy);
+ }
+ }
+
+ public Set<String> getTopics() {
+ return Collections.unmodifiableSet(position.keySet());
+ }
+
+ public Map<Integer, Long> getBound(final String topic) {
+ return Collections.unmodifiableMap(position.get(topic));
+ }
+
+ public ByteBuffer serialize() {
+ final byte version = (byte) 0;
+
+ int arraySize = Byte.SIZE; // version
+
+ final int nTopics = position.size();
+ arraySize += Integer.SIZE;
+
+ final ArrayList<Entry<String, Map<Integer, Long>>> entries =
+ new ArrayList<>(position.entrySet());
+ final byte[][] topics = new byte[entries.size()][];
+
+ for (int i = 0; i < nTopics; i++) {
+ final Entry<String, Map<Integer, Long>> entry = entries.get(i);
+ final byte[] topicBytes = entry.getKey().getBytes(StandardCharsets.UTF_8);
+ topics[i] = topicBytes;
+ arraySize += Integer.SIZE; // topic name length
+ arraySize += topicBytes.length; // topic name itself
+
+ final Map<Integer, Long> partitionOffsets = entry.getValue();
+ arraySize += Integer.SIZE; // Number of PartitionOffset pairs
+ arraySize += (Integer.SIZE + Long.SIZE)
+ * partitionOffsets.size(); // partitionOffsets themselves
+ }
+
+ final ByteBuffer buffer = ByteBuffer.allocate(arraySize);
+ buffer.put(version);
+
+ buffer.putInt(nTopics);
+ for (int i = 0; i < nTopics; i++) {
+ buffer.putInt(topics[i].length);
+ buffer.put(topics[i]);
+
+ final Entry<String, Map<Integer, Long>> entry = entries.get(i);
+ final Map<Integer, Long> partitionOffsets = entry.getValue();
+ buffer.putInt(partitionOffsets.size());
+ for (final Entry<Integer, Long> partitionOffset : partitionOffsets.entrySet()) {
+ buffer.putInt(partitionOffset.getKey());
+ buffer.putLong(partitionOffset.getValue());
+ }
+ }
+
+ buffer.flip();
+ return buffer;
+ }
+
+ public static Position deserialize(final ByteBuffer buffer) {
+ final byte version = buffer.get();
+
+ switch (version) {
+ case (byte) 0:
+ final int nTopics = buffer.getInt();
+ final Map<String, Map<Integer, Long>> position = new HashMap<>(nTopics);
+ for (int i = 0; i < nTopics; i++) {
+ final int topicNameLength = buffer.getInt();
+ final byte[] topicNameBytes = new byte[topicNameLength];
+ buffer.get(topicNameBytes);
+ final String topic = new String(topicNameBytes, StandardCharsets.UTF_8);
+
+ final int numPairs = buffer.getInt();
+ final Map<Integer, Long> partitionOffsets = new HashMap<>(numPairs);
+ for (int j = 0; j < numPairs; j++) {
+ partitionOffsets.put(buffer.getInt(), buffer.getLong());
+ }
+ position.put(topic, partitionOffsets);
+ }
+ return Position.fromMap(position);
+ default:
+ throw new IllegalArgumentException(
+ "Unknown version " + version + " when deserializing Position"
+ );
+ }
+ }
+
+ private static Map<String, Map<Integer, Long>> deepCopy(
+ final Map<String, Map<Integer, Long>> map) {
+ if (map == null) {
+ return new HashMap<>();
+ } else {
+ final Map<String, Map<Integer, Long>> copy = new HashMap<>(map.size());
+ for (final Entry<String, Map<Integer, Long>> entry : map.entrySet()) {
+ copy.put(entry.getKey(), new HashMap<>(entry.getValue()));
+ }
+ return copy;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Position{" +
+ "position=" + position +
+ '}';
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final Position position1 = (Position) o;
+ return Objects.equals(position, position1.position);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(position);
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
new file mode 100644
index 0000000..a4dbe35
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import java.util.Objects;
+
+public class PositionBound {
+
+ private final Position position;
+ private final boolean unbounded;
+
+ private PositionBound(final Position position, final boolean unbounded) {
+ if (unbounded && position != null) {
+ throw new IllegalArgumentException();
+ }
+ this.position = position;
+ this.unbounded = unbounded;
+ }
+
+ public static PositionBound unbounded() {
+ return new PositionBound(null, true);
+ }
+
+ public static PositionBound at(final Position position) {
+ return new PositionBound(position, false);
+ }
+
+ public boolean isUnbounded() {
+ return unbounded;
+ }
+
+ public Position position() {
+ if (unbounded) {
+ throw new IllegalArgumentException();
+ } else {
+ return position;
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (isUnbounded()) {
+ return "PositionBound{unbounded}";
+ } else {
+ return "PositionBound{position=" + position + '}';
+ }
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final PositionBound that = (PositionBound) o;
+ return unbounded == that.unbounded && Objects.equals(position, that.position);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(position, unbounded);
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/Query.java
similarity index 53%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
copy to streams/src/main/java/org/apache/kafka/streams/query/Query.java
index 1936d29..988f904 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Query.java
@@ -14,19 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.streams.query;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import static java.util.Objects.requireNonNull;
+public interface Query<R> {
-public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
- public ValueAndTimestampSerde(final Serde<V> valueSerde) {
- super(
- new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
- new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
- );
- }
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
new file mode 100644
index 0000000..f3b92d6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StreamThread.State;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public final class QueryResult<R> {
+
+ private final List<String> executionInfo = new LinkedList<>();
+ private final FailureReason failureReason;
+ private final String failure;
+ private final R result;
+ private Position boundUpdate;
+
+ private QueryResult(final R result) {
+ this.result = result;
+ this.failureReason = null;
+ this.failure = null;
+ }
+
+ private QueryResult(final FailureReason failureReason, final String failure) {
+ this.result = null;
+ this.failureReason = failureReason;
+ this.failure = failure;
+ }
+
+ public static <R> QueryResult<R> forResult(final R result) {
+ return new QueryResult<>(result);
+ }
+
+ public static <R> QueryResult<R> forUnknownQueryType(
+ final Query<R> query,
+ final StateStore store) {
+
+ return new QueryResult<>(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + store.getClass() + ") doesn't know how to execute "
+ + "the given query (" + query + ")." +
+ " Contact the store maintainer if you need support for a new query type.");
+ }
+
+ public static <R> QueryResult<R> notUpToBound(
+ final Position currentPosition,
+ final PositionBound positionBound,
+ final int partition) {
+
+ return new QueryResult<>(
+ FailureReason.NOT_UP_TO_BOUND,
+ "For store partition " + partition + ", the current position "
+ + currentPosition + " is not yet up to the bound "
+ + positionBound
+ );
+ }
+
+ public static <R> QueryResult<R> notActive(
+ final State state,
+ final boolean active,
+ final int partition) {
+ return new QueryResult<>(
+ FailureReason.NOT_UP_TO_BOUND,
+ "Query requires a running active task,"
+ + " but partition " + partition + " was in state " + state + " and was "
+ + (active ? "active" : "not active") + "."
+ );
+ }
+
+
+ public <NewR> QueryResult<NewR> swapResult(final NewR typedResult) {
+ final QueryResult<NewR> queryResult = new QueryResult<>(typedResult);
+ queryResult.executionInfo.addAll(executionInfo);
+ queryResult.boundUpdate = boundUpdate;
+ return queryResult;
+ }
+
+ public void addExecutionInfo(final String s) {
+ executionInfo.add(s);
+ }
+
+ public void throwIfFailure() {
+ if (isFailure()) {
+ throw new RuntimeException(failureReason.name() + ": " + failure);
+ }
+ }
+
+ public boolean isSuccess() {
+ return failureReason == null;
+ }
+
+ public boolean isFailure() {
+ return failureReason != null;
+ }
+
+ public List<String> getExecutionInfo() {
+ return executionInfo;
+ }
+
+ public FailureReason getFailureReason() {
+ return failureReason;
+ }
+
+ public String getFailure() {
+ return failure;
+ }
+
+ public R getResult() {
+ if (result == null) {
+ throwIfFailure();
+ }
+ // will return `null` if there's not a failure recorded.
+ return result;
+ }
+
+ public void setPosition(final Position boundUpdate) {
+ this.boundUpdate = boundUpdate;
+ }
+
+ public Position getPosition() {
+ return boundUpdate;
+ }
+
+ @Override
+ public String toString() {
+ return "QueryResult{" +
+ "executionInfo=" + executionInfo +
+ ", failureReason=" + failureReason +
+ ", failure='" + failure + '\'' +
+ ", result=" + result +
+ ", boundUpdate=" + boundUpdate +
+ '}';
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java
similarity index 53%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
copy to streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java
index 1936d29..c42762e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java
@@ -14,19 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.streams.query;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.common.utils.Bytes;
-import static java.util.Objects.requireNonNull;
+public class RawKeyQuery implements Query<byte[]> {
-public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
- public ValueAndTimestampSerde(final Serde<V> valueSerde) {
- super(
- new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
- new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
- );
+ private final Bytes key;
+
+ private RawKeyQuery(final Bytes key) {
+ this.key = key;
+ }
+
+ public static RawKeyQuery withKey(final Bytes key) {
+ return new RawKeyQuery(key);
+ }
+
+ public static RawKeyQuery withKey(final byte[] key) {
+ return new RawKeyQuery(Bytes.wrap(key));
+ }
+
+ public Bytes getKey() {
+ return key;
}
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/RawScanQuery.java
similarity index 53%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
copy to streams/src/main/java/org/apache/kafka/streams/query/RawScanQuery.java
index 1936d29..d1f14f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/RawScanQuery.java
@@ -14,19 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.streams.query;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
-import static java.util.Objects.requireNonNull;
+public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
-public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
- public ValueAndTimestampSerde(final Serde<V> valueSerde) {
- super(
- new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
- new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
- );
+ private RawScanQuery() {}
+
+ public static RawScanQuery scan() {
+ return new RawScanQuery();
}
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index f9f0bdc..da7927e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -212,4 +212,13 @@ public final class StateSerdes<K, V> {
e);
}
}
+
+ @Override
+ public String toString() {
+ return "StateSerdes{" +
+ "topic='" + topic + '\'' +
+ ", keySerde=" + keySerde +
+ ", valueSerde=" + valueSerde +
+ '}';
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index bfee6b2..c1f7461 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -27,6 +27,10 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
@@ -279,6 +283,20 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
return open;
}
+ @Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+ if (query instanceof RawKeyQuery) {
+ final Bytes key = ((RawKeyQuery) query).getKey();
+ final byte[] bytes = get(key);
+ return QueryResult.forResult((R) bytes);
+ } else {
+ return QueryResult.forUnknownQueryType(query, this);
+ }
+ }
+
// Visible for testing
List<S> getSegments() {
return segments.allSegments(false);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index f0c6dbe..f9bc34e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -21,6 +21,9 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
@@ -73,6 +76,15 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
}
@Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
+ return StoreQueryUtils.requireKVQuery(query, this, collectExecutionInfo);
+ }
+
+ @Override
public synchronized byte[] get(final Bytes key) {
return map.get(key);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index ba8a745..e13b357 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -40,6 +40,9 @@ import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.DeserializationResult;
@@ -240,6 +243,15 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
}
@Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
+ return QueryResult.forUnknownQueryType(query, this);
+ }
+
+ @Override
public void close() {
open = false;
index.clear();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 22f1215..1bea349 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -22,6 +22,9 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -108,6 +111,15 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
}
@Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
+ return StoreQueryUtils.requireKVQuery(query, this, collectExecutionInfo);
+ }
+
+ @Override
public synchronized byte[] get(final Bytes key) {
Objects.requireNonNull(key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 937288c..21b8e38 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -34,6 +34,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
@@ -186,6 +191,38 @@ public class MeteredKeyValueStore<K, V>
return false;
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
+ final long start = System.nanoTime();
+ final QueryResult<R> result;
+
+ if (query instanceof KeyQuery) {
+ final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+ final RawKeyQuery rawKeyQuery = RawKeyQuery.withKey(keyBytes(typedQuery.getKey()));
+ final QueryResult<byte[]> rawResult =
+ wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+ if (rawResult.isSuccess()) {
+ final V value = outerValue(rawResult.getResult());
+ final QueryResult<V> typedQueryResult =
+ rawResult.swapResult(value);
+ result = (QueryResult<R>) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no result set.
+ result = (QueryResult<R>) rawResult;
+ }
+ } else {
+ result = wrapped().query(query, positionBound, collectExecutionInfo);
+ }
+ final long end = System.nanoTime();
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " with serdes " + serdes + " in " + (end - start) + "ns");
+ return result;
+ }
+
@Override
public V get(final K key) {
Objects.requireNonNull(key, "key cannot be null");
@@ -324,6 +361,10 @@ public class MeteredKeyValueStore<K, V>
}
}
+ public StateSerdes<K, V> serdes() {
+ return serdes;
+ }
+
private class MeteredKeyValueIterator implements KeyValueIterator<K, V> {
private final KeyValueIterator<Bytes, byte[]> iter;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index b1eb948..87b3b40 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -372,4 +372,8 @@ public class MeteredSessionStore<K, V>
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
+
+ public StateSerdes<K, V> serdes() {
+ return serdes;
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 0970703..1f267bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -33,6 +33,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
@@ -63,11 +68,11 @@ public class MeteredWindowStore<K, V>
private TaskId taskId;
MeteredWindowStore(final WindowStore<Bytes, byte[]> inner,
- final long windowSizeMs,
- final String metricsScope,
- final Time time,
- final Serde<K> keySerde,
- final Serde<V> valueSerde) {
+ final long windowSizeMs,
+ final String metricsScope,
+ final Time time,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
super(inner);
this.windowSizeMs = windowSizeMs;
this.metricsScope = metricsScope;
@@ -87,7 +92,8 @@ public class MeteredWindowStore<K, V>
registerMetrics();
final Sensor restoreSensor =
- StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
+ StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(),
+ streamsMetrics);
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
@@ -103,20 +109,26 @@ public class MeteredWindowStore<K, V>
registerMetrics();
final Sensor restoreSensor =
- StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
+ StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(),
+ streamsMetrics);
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
}
+
protected Serde<V> prepareValueSerde(final Serde<V> valueSerde, final SerdeGetter getter) {
return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
}
private void registerMetrics() {
- putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
- fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
- flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
- e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
+ putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(),
+ streamsMetrics);
+ fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(),
+ streamsMetrics);
+ flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(),
+ streamsMetrics);
+ e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope,
+ name(), streamsMetrics);
}
@Deprecated
@@ -126,7 +138,8 @@ public class MeteredWindowStore<K, V>
serdes = new StateSerdes<>(
changelogTopic != null ?
changelogTopic :
- ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()),
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName,
+ taskId.topologyName()),
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerde(valueSerde, new SerdeGetter(context)));
}
@@ -137,7 +150,8 @@ public class MeteredWindowStore<K, V>
serdes = new StateSerdes<>(
changelogTopic != null ?
changelogTopic :
- ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()),
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName,
+ taskId.topologyName()),
prepareKeySerde(keySerde, new SerdeGetter(context)),
prepareValueSerde(valueSerde, new SerdeGetter(context)));
}
@@ -145,7 +159,7 @@ public class MeteredWindowStore<K, V>
@SuppressWarnings("unchecked")
@Override
public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener,
- final boolean sendOldValues) {
+ final boolean sendOldValues) {
final WindowStore<Bytes, byte[]> wrapped = wrapped();
if (wrapped instanceof CachedStateStore) {
return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
@@ -161,10 +175,43 @@ public class MeteredWindowStore<K, V>
return false;
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo
+ ) {
+ if (query instanceof KeyQuery) {
+ final Windowed<K> key = ((KeyQuery<Windowed<K>, V>) query).getKey();
+ final Bytes bytes = keyBytes(key.key());
+ // NOTE: we need to _fully_ serialize the key, since we can't pass in any
+ // extra timestamp information in the RawKeyQuery. So, we go ahead and use the
+ // internal store's binary schema. This works for the provided KS windowed stores,
+ // but a custom store that uses a different schema will have to use the WindowKeySchema
+ // to read the data back out of the array and convert it to whatever the real binary
+ // key is.
+ // seqnum hard-coded to zero since we don't query stream-stream join stores.
+ final Bytes storeKey = WindowKeySchema.toStoreKeyBinary(
+ bytes,
+ key.window().start(),
+ 0
+ );
+ final RawKeyQuery rawKeyQuery = RawKeyQuery.withKey(storeKey);
+ final QueryResult<byte[]> rawResult =
+ wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+ final V v = serdes.valueFrom(rawResult.getResult());
+ final QueryResult<V> result = rawResult.swapResult(v);
+ return (QueryResult<R>) result;
+ } else {
+ return super.query(query, positionBound, collectExecutionInfo);
+ }
+ }
+
@Override
public void put(final K key,
- final V value,
- final long windowStartTimestamp) {
+ final V value,
+ final long windowStartTimestamp) {
Objects.requireNonNull(key, "key cannot be null");
try {
maybeMeasureLatency(
@@ -181,7 +228,7 @@ public class MeteredWindowStore<K, V>
@Override
public V fetch(final K key,
- final long timestamp) {
+ final long timestamp) {
Objects.requireNonNull(key, "key cannot be null");
return maybeMeasureLatency(
() -> {
@@ -198,8 +245,8 @@ public class MeteredWindowStore<K, V>
@Override
public WindowStoreIterator<V> fetch(final K key,
- final long timeFrom,
- final long timeTo) {
+ final long timeFrom,
+ final long timeTo) {
Objects.requireNonNull(key, "key cannot be null");
return new MeteredWindowStoreIterator<>(
wrapped().fetch(keyBytes(key), timeFrom, timeTo),
@@ -212,8 +259,8 @@ public class MeteredWindowStore<K, V>
@Override
public WindowStoreIterator<V> backwardFetch(final K key,
- final long timeFrom,
- final long timeTo) {
+ final long timeFrom,
+ final long timeTo) {
Objects.requireNonNull(key, "key cannot be null");
return new MeteredWindowStoreIterator<>(
wrapped().backwardFetch(keyBytes(key), timeFrom, timeTo),
@@ -226,9 +273,9 @@ public class MeteredWindowStore<K, V>
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
- final K keyTo,
- final long timeFrom,
- final long timeTo) {
+ final K keyTo,
+ final long timeFrom,
+ final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().fetch(
keyBytes(keyFrom),
@@ -243,9 +290,9 @@ public class MeteredWindowStore<K, V>
@Override
public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
- final K keyTo,
- final long timeFrom,
- final long timeTo) {
+ final K keyTo,
+ final long timeFrom,
+ final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().backwardFetch(
keyBytes(keyFrom),
@@ -260,7 +307,7 @@ public class MeteredWindowStore<K, V>
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
- final long timeTo) {
+ final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().fetchAll(timeFrom, timeTo),
fetchSensor,
@@ -271,7 +318,7 @@ public class MeteredWindowStore<K, V>
@Override
public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long timeFrom,
- final long timeTo) {
+ final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().backwardFetchAll(timeFrom, timeTo),
fetchSensor,
@@ -282,12 +329,14 @@ public class MeteredWindowStore<K, V>
@Override
public KeyValueIterator<Windowed<K>, V> all() {
- return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchSensor, streamsMetrics, serdes, time);
+ return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchSensor, streamsMetrics,
+ serdes, time);
}
@Override
public KeyValueIterator<Windowed<K>, V> backwardAll() {
- return new MeteredWindowedKeyValueIterator<>(wrapped().backwardAll(), fetchSensor, streamsMetrics, serdes, time);
+ return new MeteredWindowedKeyValueIterator<>(wrapped().backwardAll(), fetchSensor,
+ streamsMetrics, serdes, time);
}
@Override
@@ -313,8 +362,12 @@ public class MeteredWindowStore<K, V>
// In that case, we _can't_ get the current timestamp, so we don't record anything.
if (e2eLatencySensor.shouldRecord() && context != null) {
final long currentTime = time.milliseconds();
- final long e2eLatency = currentTime - context.timestamp();
+ final long e2eLatency = currentTime - context.timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
+
+ public StateSerdes<K, V> serdes() {
+ return serdes;
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 07cf0ee..aedb67c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -20,6 +20,7 @@ import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -71,4 +72,8 @@ public class QueryableStoreProvider {
public void removeStoreProviderForThread(final String threadName) {
this.storeProviders.remove(threadName);
}
+
+ public Collection<StreamThreadStateStoreProvider> getStoreProviders() {
+ return storeProviders.values();
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index aa1b1ba..e9ea595 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -28,6 +28,11 @@ import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -61,6 +66,7 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -105,6 +111,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
private final RocksDBMetricsRecorder metricsRecorder;
protected volatile boolean open = false;
+ private StateStoreContext context;
+ private Map<String, Map<Integer, Long>> seenOffsets = new HashMap<>();
RocksDBStore(final String name,
final String metricsScope) {
@@ -252,6 +260,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
// value getter should always read directly from rocksDB
// since it is only for values that are already flushed
context.register(root, new RocksDBBatchingRestoreCallback(this));
+ this.context = context;
}
@Override
@@ -269,6 +278,29 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
return open;
}
+ @Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
+ if (context == null) {
+ throw new IllegalStateException("Store is not yet initialized");
+ } else {
+ final int partition = this.context.taskId().partition();
+ if (StoreQueryUtils.isPermitted(seenOffsets, positionBound, partition)) {
+ final QueryResult<R> result = StoreQueryUtils.requireKVQuery(query, this,
+ collectExecutionInfo);
+ final Position currentPosition = Position.fromMap(seenOffsets);
+ result.setPosition(currentPosition);
+ return result;
+ } else {
+ return QueryResult.notUpToBound(Position.fromMap(seenOffsets), positionBound,
+ partition);
+ }
+ }
+ }
+
private void validateStoreOpen() {
if (!open) {
throw new InvalidStateStoreException("Store " + name + " is currently closed");
@@ -281,6 +313,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
dbAccessor.put(key.get(), value);
+ // FIXME record metadata can be null because when this store is used as a Segment,
+ // we never call init(). Is that correct?
+ // to make this logic work properly for segmented stores, we either need to
+ // track the seen offsets one level up (in the RocksDBSegmentedBytesStore) OR
+ // we need to get a reference to the context here.
+ if (context != null && context.recordMetadata().isPresent()) {
+ final RecordMetadata meta = context.recordMetadata().get();
+ seenOffsets.computeIfAbsent(meta.topic(), t -> new HashMap<>())
+ .put(meta.partition(), meta.offset());
+ }
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
new file mode 100644
index 0000000..603257c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.Map;
+
+public final class StoreQueryUtils {
+
+ // make this class uninstantiable
+ private StoreQueryUtils() {
+ }
+
+ public static <R> QueryResult<R> requireKVQuery(
+ final Query<R> query,
+ final KeyValueStore<Bytes, byte[]> kvStore,
+ final boolean enableExecutionInfo) {
+ final QueryResult<R> r = StoreQueryUtils.handleKVQuery(query, kvStore, enableExecutionInfo);
+ r.throwIfFailure();
+ return r;
+ }
+
+ public static <R> QueryResult<R> handleKVQuery(
+ final Query<R> query,
+ final KeyValueStore<Bytes, byte[]> kvStore,
+ final boolean enableExecutionInfo) {
+
+ final long start = System.nanoTime();
+ final String name = query.getClass().getCanonicalName();
+ switch (name) {
+ case "org.apache.kafka.streams.query.RawKeyQuery": {
+ final RawKeyQuery keyQuery = (RawKeyQuery) query;
+ return handleRawKeyQuery(kvStore, enableExecutionInfo, start, keyQuery);
+ }
+ case "org.apache.kafka.streams.query.RawScanQuery": {
+ final KeyValueIterator<Bytes, byte[]> iterator = kvStore.all();
+ @SuppressWarnings("unchecked") final R result = (R) iterator;
+ final long end = System.nanoTime();
+ final QueryResult<R> queryResult = QueryResult.forResult(result);
+ if (enableExecutionInfo) {
+ queryResult.addExecutionInfo("Handled on " + kvStore.getClass().getName()
+ + "#all via StoreQueryAdapters" + " in " + (end - start) + "ns");
+ }
+ return queryResult;
+ }
+ default:
+ return QueryResult.forUnknownQueryType(query, kvStore);
+ }
+ }
+
+ public static <R> QueryResult<R> handleRawKeyQuery(
+ final KeyValueStore<Bytes, byte[]> kvStore,
+ final boolean enableExecutionInfo,
+ final long start,
+ final RawKeyQuery keyQuery) {
+
+ final Bytes key = keyQuery.getKey();
+ final byte[] value = kvStore.get(key);
+ @SuppressWarnings("unchecked") final R result = (R) value;
+ final long end = System.nanoTime();
+
+ final QueryResult<R> queryResult = QueryResult.forResult(result);
+ if (enableExecutionInfo) {
+ queryResult.addExecutionInfo("Handled on " + kvStore.getClass().getName()
+ + "#get via StoreQueryAdapters" + " in " + (end - start) + "ns");
+ }
+ return queryResult;
+ }
+
+ public static boolean isPermitted(
+ final Map<String, Map<Integer, Long>> seenOffsets,
+ final PositionBound positionBound,
+ final int partition) {
+ if (positionBound.isUnbounded()) {
+ return true;
+ } else {
+ final Position position = positionBound.position();
+ for (final String topic : position.getTopics()) {
+ final Map<Integer, Long> partitionBounds = position.getBound(topic);
+ final Map<Integer, Long> seenPartitionBounds = seenOffsets.get(topic);
+ if (!partitionBounds.containsKey(partition)) {
+ // this topic isn't bounded for our partition, so just skip over it.
+ } else {
+ if (seenPartitionBounds == null) {
+ // we haven't seen a topic that is bounded for our partition
+ return false;
+ } else if (!seenPartitionBounds.containsKey(partition)) {
+ // we haven't seen a partition that we have a bound for
+ return false;
+ } else if (seenPartitionBounds.get(partition) < partitionBounds.get(
+ partition)) {
+ // our current position is behind the bound
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index a249a14..f4eb55e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -24,6 +24,9 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -185,6 +188,18 @@ public class TimestampedKeyValueStoreBuilder<K, V>
}
@Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
+ final long start = System.nanoTime();
+ final QueryResult<R> result = wrapped.query(query, positionBound, collectExecutionInfo);
+ final long end = System.nanoTime();
+ result.addExecutionInfo("Handled in " + getClass() + " in " + (end - start) + "ns");
+ return result;
+ }
+
+ @Override
public String name() {
return wrapped.name();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index b3727f5..9b28ef5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -23,6 +23,9 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -203,6 +206,18 @@ public class TimestampedWindowStoreBuilder<K, V>
}
@Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
+ final long start = System.nanoTime();
+ final QueryResult<R> result = wrapped.query(query, positionBound, collectExecutionInfo);
+ final long end = System.nanoTime();
+ result.addExecutionInfo("Handled in " + getClass() + " in " + (end - start) + "ns");
+ return result;
+ }
+
+ @Override
public String name() {
return wrapped.name();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
index 1936d29..599b6e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
@@ -23,10 +23,20 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import static java.util.Objects.requireNonNull;
public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+ private final Serde<V> valueSerde;
+
public ValueAndTimestampSerde(final Serde<V> valueSerde) {
super(
new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()),
new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer())
);
+ this.valueSerde = valueSerde;
+ }
+
+ @Override
+ public String toString() {
+ return "ValueAndTimestampSerde{" +
+ "valueSerde=" + valueSerde +
+ '}';
}
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index f7999d3..2b626cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -21,6 +21,9 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -183,6 +186,15 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
return store.isOpen();
}
+ @Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
+ return QueryResult.forUnknownQueryType(query, this);
+ }
+
private static class WindowToTimestampedWindowIteratorAdapter
extends KeyValueToTimestampedKeyValueIteratorAdapter<Long>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index e8244f7..e904a48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -20,6 +20,9 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.TimestampedBytesStore;
/**
@@ -103,6 +106,21 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S
wrapped.close();
}
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+ final long start = System.nanoTime();
+ final QueryResult<R> result = wrapped().query(query, positionBound, collectExecutionInfo);
+ if (collectExecutionInfo) {
+ final long end = System.nanoTime();
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " via WrappedStateStore" + " in " + (end - start)
+ + "ns");
+ }
+ return result;
+ }
+
public S wrapped() {
return wrapped;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
new file mode 100644
index 0000000..23374f3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.InteractiveQueryRequest;
+import org.apache.kafka.streams.query.InteractiveQueryResult;
+import org.apache.kafka.streams.query.Iterators;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
+import org.apache.kafka.streams.query.RawScanQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.query.InteractiveQueryRequest.inStore;
+import static org.apache.kafka.streams.query.PositionBound.at;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThrows;
+
+@Category({IntegrationTest.class})
+public class IQv2IntegrationTest {
+
+ private static final int NUM_BROKERS = 1;
+ private static int port = 0;
+ private static final String INPUT_TOPIC_NAME = "input-topic";
+ private static final String INPUT2_TOPIC_NAME = "input2-topic";
+ private static final String UNCACHED_TABLE = "uncached-table";
+ private static final String UNCACHED_COUNTS_TABLE = "uncached-counts-table";
+ private static final String CACHED_TABLE = "cached-table";
+
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+ @Rule
+ public TestName testName = new TestName();
+ private static KafkaStreams kafkaStreams;
+
+ @BeforeClass
+ public static void before() throws InterruptedException, IOException {
+ CLUSTER.start();
+ CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, 1);
+ CLUSTER.createTopic(INPUT2_TOPIC_NAME, 2, 1);
+
+ final Semaphore semaphore = new Semaphore(0);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder
+ .table(
+ INPUT_TOPIC_NAME,
+ Consumed.with(Serdes.Integer(), Serdes.Integer()),
+ Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(UNCACHED_TABLE)
+ .withCachingDisabled()
+ )
+ .filter(
+ (a, b) -> true,
+ Materialized.as(CACHED_TABLE)
+ )
+ .toStream()
+ .peek((k, v) -> semaphore.release());
+
+ builder
+ .stream(
+ INPUT2_TOPIC_NAME,
+ Consumed.with(Serdes.Integer(), Serdes.Integer())
+ )
+ .groupByKey()
+ .windowedBy(TimeWindows.ofSizeAndGrace(
+ Duration.ofMillis(100),
+ Duration.ZERO
+ ))
+ .count(
+ Materialized
+ .<Integer, Long, WindowStore<Bytes, byte[]>>as(UNCACHED_COUNTS_TABLE)
+ .withCachingDisabled()
+ )
+ .toStream()
+ .peek((k, v) -> semaphore.release());
+
+ kafkaStreams =
+ IntegrationTestUtils.getRunningStreams(streamsConfiguration(), builder, true);
+
+ final Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ INPUT_TOPIC_NAME,
+ Arrays.asList(new KeyValue<>(1, 1), new KeyValue<>(2, 2), new KeyValue<>(3, 3)),
+ producerProps,
+ Time.SYSTEM
+ );
+ // Assert that all messages in the first batch were processed in a timely manner
+ assertThat(semaphore.tryAcquire(3, 60, TimeUnit.SECONDS), is(equalTo(true)));
+
+ IntegrationTestUtils.produceSynchronously(
+ producerProps,
+ false,
+ INPUT2_TOPIC_NAME,
+ Optional.empty(),
+ Arrays.asList(
+ new KeyValueTimestamp<>(1, 1, 0),
+ new KeyValueTimestamp<>(1, 1, 10)
+ )
+ );
+
+ // Assert that we processed the second batch (should see both updates, since caching is disabled)
+ assertThat(semaphore.tryAcquire(2, 60, TimeUnit.SECONDS), is(equalTo(true)));
+
+
+ }
+
+ @AfterClass
+ public static void after() {
+ kafkaStreams.close(Duration.of(1, ChronoUnit.MINUTES));
+ kafkaStreams.cleanUp();
+ CLUSTER.stop();
+ }
+
+ @Test
+ public void shouldQueryKeyFromCachedTable() {
+
+ final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+ kafkaStreams.serdesForStore(CACHED_TABLE);
+
+ final byte[] rawKey = serdes.rawKey(1);
+ final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+ inStore(CACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey)));
+
+ System.out.println("|||" + result);
+ final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0);
+ final ValueAndTimestamp<Integer> value =
+ serdes.valueFrom(rawValueResult.getResult());
+ System.out.println("|||" + value);
+
+ assertThat(value.value(), is(1));
+ assertThat(result.getPosition(),
+ is(Position.fromMap(
+ mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+ }
+
+ @Test
+ public void shouldQueryKeyFromUncachedTable() {
+
+ final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+ kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+ final byte[] rawKey = serdes.rawKey(1);
+ final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+ inStore(UNCACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey)));
+
+ System.out.println("|||" + result);
+ final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0);
+ final ValueAndTimestamp<Integer> value =
+ serdes.valueFrom(rawValueResult.getResult());
+ System.out.println("|||" + value);
+
+ assertThat(value.value(), is(1));
+ assertThat(result.getPosition(),
+ is(Position.fromMap(
+ mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+ }
+
+ @Test
+ public void shouldQueryTypedKeyFromUncachedTable() {
+ final Integer key = 1;
+
+ final InteractiveQueryRequest<ValueAndTimestamp<Integer>> query =
+ inStore(UNCACHED_TABLE).withQuery(KeyQuery.withKey(key));
+
+ final InteractiveQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query);
+
+ final ValueAndTimestamp<Integer> value = result.getOnlyPartitionResult().getResult();
+
+ assertThat(value.value(), is(1));
+ assertThat(result.getPosition(),
+ is(Position.fromMap(
+ mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+ }
+
+ @Test
+ public void exampleKeyQueryIntoWindowStore() {
+ final Windowed<Integer> key = new Windowed<>(1, new TimeWindow(0L, 99L));
+
+ final InteractiveQueryRequest<ValueAndTimestamp<Long>> query =
+ inStore(UNCACHED_COUNTS_TABLE).withQuery(KeyQuery.withKey(key));
+
+ final InteractiveQueryResult<ValueAndTimestamp<Long>> result = kafkaStreams.query(query);
+
+ final ValueAndTimestamp<Long> value = result.getOnlyPartitionResult().getResult();
+
+ assertThat(value.value(), is(2L));
+ }
+
+ @Test
+ public void shouldScanUncachedTablePartitions() {
+
+ final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+ kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+ final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
+ kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan()));
+
+ System.out.println("|||" + scanResult);
+ final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
+ scanResult.getPartitionResults();
+ for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) {
+ try (final KeyValueIterator<Bytes, byte[]> keyValueIterator =
+ entry.getValue().getResult()) {
+ while (keyValueIterator.hasNext()) {
+ final KeyValue<Bytes, byte[]> next = keyValueIterator.next();
+ System.out.println(
+ "|||" + entry.getKey() +
+ " " + serdes.keyFrom(next.key.get()) +
+ " " + serdes.valueFrom(next.value)
+ );
+ }
+ }
+ }
+
+ assertThat(scanResult.getPosition(),
+ is(Position.fromMap(
+ mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+ }
+
+ @Test
+ public void shouldScanUncachedTableCollated() {
+
+ final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+ kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+ final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
+ kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan()));
+
+ System.out.println("|||" + scanResult);
+ final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults = scanResult.getPartitionResults();
+
+ final List<KeyValueIterator<Bytes, byte[]>> collect =
+ partitionResults
+ .values()
+ .stream()
+ .map(QueryResult::getResult)
+ .collect(Collectors.toList());
+ try (final CloseableIterator<KeyValue<Bytes, byte[]>> collate = Iterators.collate(
+ collect)) {
+ while (collate.hasNext()) {
+ final KeyValue<Bytes, byte[]> next = collate.next();
+ System.out.println(
+ "|||" +
+ " " + serdes.keyFrom(next.key.get()) +
+ " " + serdes.valueFrom(next.value)
+ );
+ }
+ }
+
+ assertThat(scanResult.getPosition(),
+ is(Position.fromMap(
+ mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+ }
+
+ @Test
+ public void shouldQueryWithinBound() {
+
+ final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+ kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+ final byte[] rawKey = serdes.rawKey(1);
+ final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+ inStore(UNCACHED_TABLE)
+ .withQuery(RawKeyQuery.withKey(rawKey))
+ .withPositionBound(
+ at(
+ Position
+ .emptyPosition()
+ .withComponent(INPUT_TOPIC_NAME, 0, 0L)
+ .withComponent(INPUT_TOPIC_NAME, 1, 1L)
+ )
+ )
+ );
+
+ System.out.println("|||" + result);
+ final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0);
+ final ValueAndTimestamp<Integer> value =
+ serdes.valueFrom(rawValueResult.getResult());
+ System.out.println("|||" + value);
+ assertThat(result.getPosition(),
+ is(Position.fromMap(
+ mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L)))))));
+ }
+
+ @Test
+ public void shouldFailQueryOutsideOfBound() {
+
+ final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+ kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+ final byte[] rawKey = serdes.rawKey(1);
+ // intentionally setting the bound higher than the current position.
+ final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+ inStore(UNCACHED_TABLE)
+ .withQuery(RawKeyQuery.withKey(rawKey))
+ .withPositionBound(
+ at(
+ Position
+ .emptyPosition()
+ .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+ .withComponent(INPUT_TOPIC_NAME, 1, 2L)
+ )
+ )
+ );
+
+ System.out.println("|||" + result);
+ final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0);
+
+ final RuntimeException runtimeException = assertThrows(
+ RuntimeException.class,
+ rawValueResult::getResult
+ );
+ assertThat(
+ runtimeException.getMessage(),
+ is("NOT_UP_TO_BOUND: For store partition 0, the current position Position{position={input-topic={0=0}}} is not yet up to the bound PositionBound{position=Position{position={input-topic={0=1, 1=2}}}}"));
+
+ assertThat(rawValueResult.isFailure(), is(true));
+ assertThat(rawValueResult.getFailureReason(), is(FailureReason.NOT_UP_TO_BOUND));
+ assertThat(rawValueResult.getFailure(),
+ is("For store partition 0, the current position Position{position={input-topic={0=0}}} is not yet up to the bound PositionBound{position=Position{position={input-topic={0=1, 1=2}}}}"));
+ assertThat(result.getPosition(), is(Position.emptyPosition()));
+ }
+
+
+ @Test
+ public void shouldPartiallySucceedOnPartialBound() {
+
+ final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
+ kafkaStreams.serdesForStore(UNCACHED_TABLE);
+
+ final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
+ kafkaStreams.query(
+ inStore(UNCACHED_TABLE)
+ .withQuery(RawScanQuery.scan())
+ .withPositionBound(
+ at(
+ Position
+ .emptyPosition()
+ .withComponent(INPUT_TOPIC_NAME, 0, 0L)
+ .withComponent(INPUT_TOPIC_NAME, 1, 2L)
+ )
+ )
+ );
+
+ System.out.println("|||" + scanResult);
+ final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults = scanResult.getPartitionResults();
+ for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) {
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> value = entry.getValue();
+ if (value.isSuccess()) {
+ try (final KeyValueIterator<Bytes, byte[]> keyValueIterator =
+ value.getResult()) {
+ while (keyValueIterator.hasNext()) {
+ final KeyValue<Bytes, byte[]> next = keyValueIterator.next();
+ System.out.println(
+ "|||" + entry.getKey() +
+ " " + serdes.keyFrom(next.key.get()) +
+ " " + serdes.valueFrom(next.value)
+ );
+ }
+ }
+ }
+ }
+
+ assertThat(scanResult.getPartitionResults().get(0).isSuccess(), is(true));
+ assertThat(scanResult.getPartitionResults().get(1).isFailure(), is(true));
+ assertThat(scanResult.getPartitionResults().get(1).getFailureReason(),
+ is(FailureReason.NOT_UP_TO_BOUND));
+ assertThat(scanResult.getPosition(),
+ is(Position.fromMap(mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L)))))));
+ }
+
+ private static Properties streamsConfiguration() {
+ final String safeTestName = IQv2IntegrationTest.class.getName();
+ final Properties config = new Properties();
+ config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+ config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
+ config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+ config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+ config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+ config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+ config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
+ config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
+ config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
+ config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+ return config;
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 066f080..3772904 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -71,6 +71,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
@@ -786,6 +787,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
EasyMock.expect(context.appConfigs())
.andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
EasyMock.expect(context.stateDir()).andStubReturn(dir);
+ EasyMock.expect(context.recordMetadata()).andReturn(Optional.empty());
EasyMock.replay(context);
rocksDBStore.init((StateStoreContext) context, rocksDBStore);
@@ -818,6 +820,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
EasyMock.expect(context.appConfigs())
.andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
EasyMock.expect(context.stateDir()).andStubReturn(dir);
+ EasyMock.expect(context.recordMetadata()).andReturn(Optional.empty());
EasyMock.replay(context);
rocksDBStore.init((StateStoreContext) context, rocksDBStore);