You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/01/03 04:25:25 UTC
[kafka] branch trunk updated: KAFKA-13494: WindowKeyQuery and WindowRangeQuery (#11567)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b8f1cf1 KAFKA-13494: WindowKeyQuery and WindowRangeQuery (#11567)
b8f1cf1 is described below
commit b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c
Author: Patrick Stuedi <ps...@confluent.io>
AuthorDate: Mon Jan 3 05:17:38 2022 +0100
KAFKA-13494: WindowKeyQuery and WindowRangeQuery (#11567)
Implement WindowKeyQuery and WindowRangeQuery as
proposed in KIP-806
Reviewer: John Roesler <vv...@apache.org>
---
.../apache/kafka/streams/query/QueryResult.java | 2 +-
.../apache/kafka/streams/query/WindowKeyQuery.java | 67 ++++
.../kafka/streams/query/WindowRangeQuery.java | 69 ++++
.../streams/query/internals/FailedQueryResult.java | 10 +
.../query/internals/SucceededQueryResult.java | 9 +
.../state/internals/InMemoryKeyValueStore.java | 19 +-
.../state/internals/InMemorySessionStore.java | 6 +-
.../state/internals/InMemoryWindowStore.java | 6 +-
.../state/internals/MeteredKeyValueStore.java | 32 +-
.../state/internals/MeteredSessionStore.java | 117 +++++-
.../state/internals/MeteredWindowStore.java | 203 +++++++++-
.../internals/MeteredWindowStoreIterator.java | 11 +-
.../internals/MeteredWindowedKeyValueIterator.java | 16 +-
.../state/internals/RocksDBSessionStore.java | 6 +-
.../state/internals/RocksDBWindowStore.java | 6 +-
.../streams/state/internals/StoreQueryUtils.java | 136 +++++++
.../integration/IQv2StoreIntegrationTest.java | 418 +++++++++++++++++++--
17 files changed, 1031 insertions(+), 102 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
index 5a5987f..d8d535d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
@@ -58,7 +58,7 @@ public interface QueryResult<R> {
final Query<R> query,
final StateStore store) {
- return new FailedQueryResult<>(
+ return forFailure(
FailureReason.UNKNOWN_QUERY_TYPE,
"This store (" + store.getClass() + ") doesn't know how to execute "
+ "the given query (" + query + ")." +
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/WindowKeyQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/WindowKeyQuery.java
new file mode 100644
index 0000000..79e9dc8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/WindowKeyQuery.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+import java.util.Optional;
+
+@Evolving
+public class WindowKeyQuery<K, V> implements Query<WindowStoreIterator<V>> {
+
+ private final K key;
+ private final Optional<Instant> timeFrom;
+ private final Optional<Instant> timeTo;
+
+ private WindowKeyQuery(final K key,
+ final Optional<Instant> timeTo,
+ final Optional<Instant> timeFrom) {
+ this.key = key;
+ this.timeFrom = timeFrom;
+ this.timeTo = timeTo;
+ }
+
+ public static <K, V> WindowKeyQuery<K, V> withKeyAndWindowStartRange(final K key,
+ final Instant timeFrom,
+ final Instant timeTo) {
+ return new WindowKeyQuery<>(key, Optional.of(timeFrom), Optional.of(timeTo));
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public Optional<Instant> getTimeFrom() {
+ return timeFrom;
+ }
+
+ public Optional<Instant> getTimeTo() {
+ return timeTo;
+ }
+
+ @Override
+ public String toString() {
+ return "WindowKeyQuery{" +
+ "key=" + key +
+ ", timeFrom=" + timeFrom +
+ ", timeTo=" + timeTo +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/WindowRangeQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/WindowRangeQuery.java
new file mode 100644
index 0000000..fb658d7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/WindowRangeQuery.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.time.Instant;
+import java.util.Optional;
+
+public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> {
+
+ private final Optional<K> key;
+ private final Optional<Instant> timeFrom;
+ private final Optional<Instant> timeTo;
+
+ private WindowRangeQuery(final Optional<K> key,
+ final Optional<Instant> timeFrom,
+ final Optional<Instant> timeTo) {
+ this.key = key;
+ this.timeFrom = timeFrom;
+ this.timeTo = timeTo;
+ }
+
+ public static <K, V> WindowRangeQuery<K, V> withKey(final K key) {
+ return new WindowRangeQuery<>(Optional.of(key), Optional.empty(), Optional.empty());
+ }
+
+ public static <K, V> WindowRangeQuery<K, V> withWindowStartRange(final Instant timeFrom,
+ final Instant timeTo) {
+ return new WindowRangeQuery<>(Optional.empty(), Optional.of(timeFrom), Optional.of(timeTo));
+ }
+
+ public Optional<K> getKey() {
+ return key;
+ }
+
+ public Optional<Instant> getTimeFrom() {
+ return timeFrom;
+ }
+
+ public Optional<Instant> getTimeTo() {
+ return timeTo;
+ }
+
+ @Override
+ public String toString() {
+ return "WindowRangeQuery{" +
+ "key=" + key +
+ ", timeFrom=" + timeFrom +
+ ", timeTo=" + timeTo +
+ '}';
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/internals/FailedQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/internals/FailedQueryResult.java
index f49cb6b..97860de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/internals/FailedQueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/internals/FailedQueryResult.java
@@ -86,4 +86,14 @@ public final class FailedQueryResult<R>
"Cannot get result for failed query. Failure is " + failureReason.name() + ": "
+ failure);
}
+
+ @Override
+ public String toString() {
+ return "FailedQueryResult{" +
+ "failureReason=" + failureReason +
+ ", failure='" + failure + '\'' +
+ ", executionInfo=" + getExecutionInfo() +
+ ", position=" + getPosition() +
+ '}';
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/internals/SucceededQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/internals/SucceededQueryResult.java
index 53050a5..8787ff8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/internals/SucceededQueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/internals/SucceededQueryResult.java
@@ -99,4 +99,13 @@ public final class SucceededQueryResult<R>
public R getResult() {
return result;
}
+
+ @Override
+ public String toString() {
+ return "SucceededQueryResult{" +
+ "result=" + result +
+ ", executionInfo=" + getExecutionInfo() +
+ ", position=" + getPosition() +
+ '}';
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index f5f75ab..061cd85 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
-import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
@@ -93,20 +92,10 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
}
@Override
- @SuppressWarnings("unchecked")
- public <R> QueryResult<R> query(
- final Query<R> query,
- final PositionBound positionBound,
- final boolean collectExecutionInfo) {
-
- if (query instanceof RangeQuery) {
- final RangeQuery<Bytes, byte[]> typedQuery = (RangeQuery<Bytes, byte[]>) query;
- final KeyValueIterator<Bytes, byte[]> keyValueIterator = this.range(
- typedQuery.getLowerBound().orElse(null), typedQuery.getUpperBound().orElse(null));
- final R result = (R) keyValueIterator;
- final QueryResult<R> queryResult = QueryResult.forResult(result);
- return queryResult;
- }
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
return StoreQueryUtils.handleBasicQueries(
query,
positionBound,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index cf351e3..03761cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -315,8 +315,10 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
}
@Override
- public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
- final boolean collectExecutionInfo) {
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
return StoreQueryUtils.handleBasicQueries(
query,
positionBound,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 1b49e9e..ce14dc1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -351,8 +351,10 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
}
@Override
- public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
- final boolean collectExecutionInfo) {
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
return StoreQueryUtils.handleBasicQueries(
query,
positionBound,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 76679af..cd5bd5a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
@@ -35,12 +34,12 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
@@ -51,11 +50,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.function.Function;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
/**
* A Metered {@link KeyValueStore} wrapper that is used for recording operation metrics, and hence its
@@ -269,7 +270,7 @@ public class MeteredKeyValueStore<K, V>
final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator(
iterator,
getSensor,
- getValueDeserializer()
+ getDeserializeValue(serdes, wrapped())
);
final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
@@ -296,8 +297,8 @@ public class MeteredKeyValueStore<K, V>
final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
if (rawResult.isSuccess()) {
- final Deserializer<V> deserializer = getValueDeserializer();
- final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult());
+ final Function<byte[], V> deserializer = getDeserializeValue(serdes, wrapped());
+ final V value = deserializer.apply(rawResult.getResult());
final QueryResult<V> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
result = (QueryResult<R>) typedQueryResult;
@@ -308,21 +309,6 @@ public class MeteredKeyValueStore<K, V>
return result;
}
- @SuppressWarnings({"unchecked", "rawtypes"})
- private Deserializer<V> getValueDeserializer() {
- final Serde<V> valueSerde = serdes.valueSerde();
- final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
- final Deserializer<V> deserializer;
- if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {
- final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
- (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) valueSerde).deserializer();
- deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
- } else {
- deserializer = valueSerde.deserializer();
- }
- return deserializer;
- }
-
@Override
public V get(final K key) {
Objects.requireNonNull(key, "key cannot be null");
@@ -507,11 +493,11 @@ public class MeteredKeyValueStore<K, V>
private final KeyValueIterator<Bytes, byte[]> iter;
private final Sensor sensor;
private final long startNs;
- private final Deserializer<V> valueDeserializer;
+ private final Function<byte[], V> valueDeserializer;
private MeteredKeyValueTimestampedIterator(final KeyValueIterator<Bytes, byte[]> iter,
final Sensor sensor,
- final Deserializer<V> valueDeserializer) {
+ final Function<byte[], V> valueDeserializer) {
this.iter = iter;
this.sensor = sensor;
this.valueDeserializer = valueDeserializer;
@@ -528,7 +514,7 @@ public class MeteredKeyValueStore<K, V>
final KeyValue<Bytes, byte[]> keyValue = iter.next();
return KeyValue.pair(
serdes.keyFrom(keyValue.key.get()),
- valueDeserializer.deserialize(serdes.topic(), keyValue.value));
+ valueDeserializer.apply(keyValue.value));
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index b1eb948..1425b05 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -33,13 +33,22 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
+import java.util.Map;
import java.util.Objects;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
public class MeteredSessionStore<K, V>
@@ -60,6 +69,15 @@ public class MeteredSessionStore<K, V>
private InternalProcessorContext<?, ?> context;
private TaskId taskId;
+ @SuppressWarnings("rawtypes")
+ private final Map<Class, QueryHandler> queryHandlers =
+ mkMap(
+ mkEntry(
+ WindowRangeQuery.class,
+ (query, positionBound, collectExecutionInfo, store) -> runRangeQuery(query, positionBound, collectExecutionInfo)
+ )
+ );
+
MeteredSessionStore(final SessionStore<Bytes, byte[]> inner,
final String metricsScope,
@@ -230,7 +248,8 @@ public class MeteredSessionStore<K, V>
wrapped().fetch(keyBytes(key)),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::keyFrom,
+ serdes::valueFrom,
time);
}
@@ -241,7 +260,8 @@ public class MeteredSessionStore<K, V>
wrapped().backwardFetch(keyBytes(key)),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::keyFrom,
+ serdes::valueFrom,
time
);
}
@@ -253,7 +273,8 @@ public class MeteredSessionStore<K, V>
wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo)),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::keyFrom,
+ serdes::valueFrom,
time);
}
@@ -264,7 +285,8 @@ public class MeteredSessionStore<K, V>
wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo)),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::keyFrom,
+ serdes::valueFrom,
time
);
}
@@ -282,7 +304,8 @@ public class MeteredSessionStore<K, V>
latestSessionStartTime),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::keyFrom,
+ serdes::valueFrom,
time);
}
@@ -300,7 +323,8 @@ public class MeteredSessionStore<K, V>
),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::keyFrom,
+ serdes::valueFrom,
time
);
}
@@ -320,7 +344,8 @@ public class MeteredSessionStore<K, V>
latestSessionStartTime),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::keyFrom,
+ serdes::valueFrom,
time);
}
@@ -340,7 +365,8 @@ public class MeteredSessionStore<K, V>
),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::keyFrom,
+ serdes::valueFrom,
time
);
}
@@ -359,6 +385,81 @@ public class MeteredSessionStore<K, V>
}
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+ final long start = time.nanoseconds();
+ final QueryResult<R> result;
+
+ final QueryHandler handler = queryHandlers.get(query.getClass());
+ if (handler == null) {
+ result = wrapped().query(query, positionBound, collectExecutionInfo);
+ if (collectExecutionInfo) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
+ }
+ } else {
+ result = (QueryResult<R>) handler.apply(
+ query,
+ positionBound,
+ collectExecutionInfo,
+ this
+ );
+ if (collectExecutionInfo) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " with serdes "
+ + serdes + " in " + (time.nanoseconds() - start) + "ns");
+ }
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runRangeQuery(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+ final QueryResult<R> result;
+ final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) query;
+ if (typedQuery.getKey().isPresent()) {
+ final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
+ WindowRangeQuery.withKey(
+ Bytes.wrap(serdes.rawKey(typedQuery.getKey().get()))
+ );
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult =
+ wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+ if (rawResult.isSuccess()) {
+ final MeteredWindowedKeyValueIterator<K, V> typedResult =
+ new MeteredWindowedKeyValueIterator<>(
+ rawResult.getResult(),
+ fetchSensor,
+ streamsMetrics,
+ serdes::keyFrom,
+ StoreQueryUtils.getDeserializeValue(serdes, wrapped()),
+ time
+ );
+ final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult =
+ QueryResult.forResult(typedResult);
+ result = (QueryResult<R>) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no result set.
+ result = (QueryResult<R>) rawResult;
+ }
+ } else {
+
+ result = QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + getClass() + ") doesn't know how to"
+ + " execute the given query (" + query + ") because"
+ + " SessionStores only support WindowRangeQuery.withKey."
+ + " Contact the store maintainer if you need support"
+ + " for a new query type."
+ );
+ }
+ return result;
+ }
+
private Bytes keyBytes(final K key) {
return key == null ? null : Bytes.wrap(serdes.rawKey(key));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 0970703..605e821 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -33,16 +33,27 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
+import java.util.Map;
import java.util.Objects;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
public class MeteredWindowStore<K, V>
extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V>
@@ -62,6 +73,27 @@ public class MeteredWindowStore<K, V>
private InternalProcessorContext<?, ?> context;
private TaskId taskId;
+ @SuppressWarnings("rawtypes")
+ private final Map<Class, QueryHandler> queryHandlers =
+ mkMap(
+ mkEntry(
+ WindowRangeQuery.class,
+ (query, positionBound, collectExecutionInfo, store) -> runRangeQuery(
+ query,
+ positionBound,
+ collectExecutionInfo
+ )
+ ),
+ mkEntry(
+ WindowKeyQuery.class,
+ (query, positionBound, collectExecutionInfo, store) -> runKeyQuery(
+ query,
+ positionBound,
+ collectExecutionInfo
+ )
+ )
+ );
+
MeteredWindowStore(final WindowStore<Bytes, byte[]> inner,
final long windowSizeMs,
final String metricsScope,
@@ -205,7 +237,7 @@ public class MeteredWindowStore<K, V>
wrapped().fetch(keyBytes(key), timeFrom, timeTo),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::valueFrom,
time
);
}
@@ -219,7 +251,7 @@ public class MeteredWindowStore<K, V>
wrapped().backwardFetch(keyBytes(key), timeFrom, timeTo),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::valueFrom,
time
);
}
@@ -237,7 +269,8 @@ public class MeteredWindowStore<K, V>
timeTo),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::keyFrom,
+ serdes::valueFrom,
time);
}
@@ -254,7 +287,8 @@ public class MeteredWindowStore<K, V>
timeTo),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::keyFrom,
+ serdes::valueFrom,
time);
}
@@ -265,7 +299,8 @@ public class MeteredWindowStore<K, V>
wrapped().fetchAll(timeFrom, timeTo),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::keyFrom,
+ serdes::valueFrom,
time);
}
@@ -276,18 +311,33 @@ public class MeteredWindowStore<K, V>
wrapped().backwardFetchAll(timeFrom, timeTo),
fetchSensor,
streamsMetrics,
- serdes,
+ serdes::keyFrom,
+ serdes::valueFrom,
time);
}
@Override
public KeyValueIterator<Windowed<K>, V> all() {
- return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchSensor, streamsMetrics, serdes, time);
+ return new MeteredWindowedKeyValueIterator<>(
+ wrapped().all(),
+ fetchSensor,
+ streamsMetrics,
+ serdes::keyFrom,
+ serdes::valueFrom,
+ time
+ );
}
@Override
public KeyValueIterator<Windowed<K>, V> backwardAll() {
- return new MeteredWindowedKeyValueIterator<>(wrapped().backwardAll(), fetchSensor, streamsMetrics, serdes, time);
+ return new MeteredWindowedKeyValueIterator<>(
+ wrapped().backwardAll(),
+ fetchSensor,
+ streamsMetrics,
+ serdes::keyFrom,
+ serdes::valueFrom,
+ time
+ );
}
@Override
@@ -304,10 +354,147 @@ public class MeteredWindowStore<K, V>
}
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+ final long start = time.nanoseconds();
+ final QueryResult<R> result;
+
+ final QueryHandler handler = queryHandlers.get(query.getClass());
+ if (handler == null) {
+ result = wrapped().query(query, positionBound, collectExecutionInfo);
+ if (collectExecutionInfo) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
+ }
+ } else {
+ result = (QueryResult<R>) handler.apply(
+ query,
+ positionBound,
+ collectExecutionInfo,
+ this
+ );
+ if (collectExecutionInfo) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " with serdes "
+ + serdes + " in " + (time.nanoseconds() - start) + "ns");
+ }
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runRangeQuery(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+ final QueryResult<R> result;
+ final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>) query;
+ // There's no store API for open time ranges
+ if (typedQuery.getTimeFrom().isPresent() && typedQuery.getTimeTo().isPresent()) {
+ final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
+ WindowRangeQuery.withWindowStartRange(
+ typedQuery.getTimeFrom().get(),
+ typedQuery.getTimeTo().get()
+ );
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult =
+ wrapped().query(
+ rawKeyQuery,
+ positionBound,
+ collectExecutionInfo
+ );
+ if (rawResult.isSuccess()) {
+ final MeteredWindowedKeyValueIterator<K, V> typedResult =
+ new MeteredWindowedKeyValueIterator<>(
+ rawResult.getResult(),
+ fetchSensor,
+ streamsMetrics,
+ serdes::keyFrom,
+ getDeserializeValue(serdes, wrapped()),
+ time
+ );
+ final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult =
+ QueryResult.forResult(
+ typedResult
+ );
+ result = (QueryResult<R>) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no result set.
+ result = (QueryResult<R>) rawResult;
+ }
+ } else {
+
+ result = QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + getClass() + ") doesn't know how to"
+ + " execute the given query (" + query + ") because"
+ + " WindowStores only supports WindowRangeQuery.withWindowStartRange."
+ + " Contact the store maintainer if you need support"
+ + " for a new query type."
+ );
+ }
+ return result;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runKeyQuery(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+ final QueryResult<R> queryResult;
+ final WindowKeyQuery<K, V> typedQuery = (WindowKeyQuery<K, V>) query;
+ // There's no store API for open time ranges
+ if (typedQuery.getTimeFrom().isPresent() && typedQuery.getTimeTo().isPresent()) {
+ final WindowKeyQuery<Bytes, byte[]> rawKeyQuery =
+ WindowKeyQuery.withKeyAndWindowStartRange(
+ keyBytes(typedQuery.getKey()),
+ typedQuery.getTimeFrom().get(),
+ typedQuery.getTimeTo().get()
+ );
+ final QueryResult<WindowStoreIterator<byte[]>> rawResult = wrapped().query(
+ rawKeyQuery,
+ positionBound,
+ collectExecutionInfo
+ );
+ if (rawResult.isSuccess()) {
+ final MeteredWindowStoreIterator<V> typedResult = new MeteredWindowStoreIterator<>(
+ rawResult.getResult(),
+ fetchSensor,
+ streamsMetrics,
+ getDeserializeValue(serdes, wrapped()),
+ time
+ );
+ final QueryResult<MeteredWindowStoreIterator<V>> typedQueryResult =
+ QueryResult.forResult(
+ typedResult
+ );
+ queryResult = (QueryResult<R>) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no result set.
+ queryResult = (QueryResult<R>) rawResult;
+ }
+ } else {
+
+ queryResult = QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + getClass() + ") doesn't know how to execute"
+ + " the given query (" + query + ") because it only supports closed-range"
+ + " queries."
+ + " Contact the store maintainer if you need support for a new query type."
+ );
+ }
+ return queryResult;
+ }
+
private Bytes keyBytes(final K key) {
return Bytes.wrap(serdes.rawKey(key));
}
+ protected V outerValue(final byte[] value) {
+ return value != null ? serdes.valueFrom(value) : null;
+ }
+
private void maybeRecordE2ELatency() {
// Context is null if the provided context isn't an implementation of InternalProcessorContext.
// In that case, we _can't_ get the current timestamp, so we don't record anything.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
index 98bc655..b368b7b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
@@ -20,27 +20,28 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStoreIterator;
+import java.util.function.Function;
+
class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
private final WindowStoreIterator<byte[]> iter;
private final Sensor sensor;
private final StreamsMetrics metrics;
- private final StateSerdes<?, V> serdes;
+ private final Function<byte[], V> valueFrom;
private final long startNs;
private final Time time;
MeteredWindowStoreIterator(final WindowStoreIterator<byte[]> iter,
final Sensor sensor,
final StreamsMetrics metrics,
- final StateSerdes<?, V> serdes,
+ final Function<byte[], V> valueFrom,
final Time time) {
this.iter = iter;
this.sensor = sensor;
this.metrics = metrics;
- this.serdes = serdes;
+ this.valueFrom = valueFrom;
this.startNs = time.nanoseconds();
this.time = time;
}
@@ -53,7 +54,7 @@ class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
@Override
public KeyValue<Long, V> next() {
final KeyValue<Long, byte[]> next = iter.next();
- return KeyValue.pair(next.key, serdes.valueFrom(next.value));
+ return KeyValue.pair(next.key, valueFrom.apply(next.value));
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
index 411871d..6809380 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
@@ -23,26 +23,30 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.function.Function;
class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
private final KeyValueIterator<Windowed<Bytes>, byte[]> iter;
private final Sensor sensor;
private final StreamsMetrics metrics;
- private final StateSerdes<K, V> serdes;
+ private final Function<byte[], K> deserializeKey;
+ private final Function<byte[], V> deserializeValue;
private final long startNs;
private final Time time;
MeteredWindowedKeyValueIterator(final KeyValueIterator<Windowed<Bytes>, byte[]> iter,
final Sensor sensor,
final StreamsMetrics metrics,
- final StateSerdes<K, V> serdes,
+ final Function<byte[], K> deserializeKey,
+ final Function<byte[], V> deserializeValue,
final Time time) {
this.iter = iter;
this.sensor = sensor;
this.metrics = metrics;
- this.serdes = serdes;
+ this.deserializeKey = deserializeKey;
+ this.deserializeValue = deserializeValue;
this.startNs = time.nanoseconds();
this.time = time;
}
@@ -55,11 +59,11 @@ class MeteredWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed
@Override
public KeyValue<Windowed<K>, V> next() {
final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
- return KeyValue.pair(windowedKey(next.key), serdes.valueFrom(next.value));
+ return KeyValue.pair(windowedKey(next.key), deserializeValue.apply(next.value));
}
private Windowed<K> windowedKey(final Windowed<Bytes> bytesKey) {
- final K key = serdes.keyFrom(bytesKey.key().get());
+ final K key = deserializeKey.apply(bytesKey.key().get());
return new Windowed<>(key, bytesKey.window());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index e6e2740..d907183 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -51,8 +51,10 @@ public class RocksDBSessionStore
}
@Override
- public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
- final boolean collectExecutionInfo) {
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
return StoreQueryUtils.handleBasicQueries(
query,
positionBound,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 40415c2..c7d941b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -129,8 +129,10 @@ public class RocksDBWindowStore
}
@Override
- public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
- final boolean collectExecutionInfo) {
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
return StoreQueryUtils.handleBasicQueries(
query,
positionBound,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index e1d20c9..f0cbdc7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -16,7 +16,10 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;
@@ -27,13 +30,20 @@ import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -65,6 +75,14 @@ public final class StoreQueryUtils {
mkEntry(
KeyQuery.class,
StoreQueryUtils::runKeyQuery
+ ),
+ mkEntry(
+ WindowKeyQuery.class,
+ StoreQueryUtils::runWindowKeyQuery
+ ),
+ mkEntry(
+ WindowRangeQuery.class,
+ StoreQueryUtils::runWindowRangeQuery
)
);
@@ -197,6 +215,124 @@ public final class StoreQueryUtils {
}
}
+ @SuppressWarnings("unchecked")
+ private static <R> QueryResult<R> runWindowKeyQuery(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo,
+ final StateStore store) {
+ if (store instanceof WindowStore) {
+ final WindowKeyQuery<Bytes, byte[]> windowKeyQuery =
+ (WindowKeyQuery<Bytes, byte[]>) query;
+ final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, byte[]>) store;
+ try {
+ if (windowKeyQuery.getTimeFrom().isPresent() && windowKeyQuery.getTimeTo().isPresent()) {
+ final WindowStoreIterator<byte[]> iterator = windowStore.fetch(
+ windowKeyQuery.getKey(),
+ windowKeyQuery.getTimeFrom().get(),
+ windowKeyQuery.getTimeTo().get()
+ );
+ return (QueryResult<R>) QueryResult.forResult(iterator);
+ } else {
+ return QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + store.getClass() + ") doesn't know how to"
+ + " execute the given query (" + query + ") because it only supports"
+ + " closed-range queries."
+ + " Contact the store maintainer if you need support"
+ + " for a new query type."
+ );
+ }
+ } catch (final Exception e) {
+ final String message = parseStoreException(e, store, query);
+ return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, message);
+ }
+ } else {
+ return QueryResult.forUnknownQueryType(query, store);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <R> QueryResult<R> runWindowRangeQuery(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo,
+ final StateStore store) {
+ if (store instanceof WindowStore) {
+ final WindowRangeQuery<Bytes, byte[]> windowRangeQuery =
+ (WindowRangeQuery<Bytes, byte[]>) query;
+ final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, byte[]>) store;
+ try {
+ // There's no store API for open time ranges
+ if (windowRangeQuery.getTimeFrom().isPresent() && windowRangeQuery.getTimeTo().isPresent()) {
+ final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+ windowStore.fetchAll(
+ windowRangeQuery.getTimeFrom().get(),
+ windowRangeQuery.getTimeTo().get()
+ );
+ return (QueryResult<R>) QueryResult.forResult(iterator);
+ } else {
+ return QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + store.getClass() + ") doesn't know how to"
+ + " execute the given query (" + query + ") because"
+ + " WindowStores only supports WindowRangeQuery.withWindowStartRange."
+ + " Contact the store maintainer if you need support"
+ + " for a new query type."
+ );
+ }
+ } catch (final Exception e) {
+ final String message = parseStoreException(e, store, query);
+ return QueryResult.forFailure(
+ FailureReason.STORE_EXCEPTION,
+ message
+ );
+ }
+ } else if (store instanceof SessionStore) {
+ final WindowRangeQuery<Bytes, byte[]> windowRangeQuery =
+ (WindowRangeQuery<Bytes, byte[]>) query;
+ final SessionStore<Bytes, byte[]> sessionStore = (SessionStore<Bytes, byte[]>) store;
+ try {
+ if (windowRangeQuery.getKey().isPresent()) {
+ final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = sessionStore.fetch(
+ windowRangeQuery.getKey().get());
+ return (QueryResult<R>) QueryResult.forResult(iterator);
+ } else {
+ return QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + store.getClass() + ") doesn't know how to"
+ + " execute the given query (" + query + ") because"
+ + " SessionStores only support WindowRangeQuery.withKey."
+ + " Contact the store maintainer if you need support"
+ + " for a new query type."
+ );
+ }
+ } catch (final Exception e) {
+ final String message = parseStoreException(e, store, query);
+ return QueryResult.forFailure(
+ FailureReason.STORE_EXCEPTION,
+ message
+ );
+ }
+ } else {
+ return QueryResult.forUnknownQueryType(query, store);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static <V> Function<byte[], V> getDeserializeValue(final StateSerdes<?, V> serdes,
+ final StateStore wrapped) {
+ final Serde<V> valueSerde = serdes.valueSerde();
+ final boolean timestamped = WrappedStateStore.isTimestamped(wrapped);
+ final Deserializer<V> deserializer;
+ if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {
+ final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
+ (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) valueSerde).deserializer();
+ deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
+ } else {
+ deserializer = valueSerde.deserializer();
+ }
+ return byteArray -> deserializer.deserialize(serdes.topic(), byteArray);
+ }
+
private static <R> String parseStoreException(final Exception e, final StateStore store, final Query<R> query) {
final StringWriter stringWriter = new StringWriter();
final PrintWriter printWriter = new PrintWriter(stringWriter);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index cfb3860..a08a04e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.Position;
@@ -44,6 +45,8 @@ import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -54,6 +57,7 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -67,6 +71,7 @@ import org.junit.runners.Parameterized;
import java.io.IOException;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -107,13 +112,13 @@ public class IQv2StoreIntegrationTest {
private static final Position INPUT_POSITION = Position.emptyPosition();
private static final String STORE_NAME = "kv-store";
- public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
- private final StoresToTest storeToTest;
+ private static final long RECORD_TIME = System.currentTimeMillis();
- public static class UnknownQuery implements Query<Void> {
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
- }
+ public static class UnknownQuery implements Query<Void> { }
+ private final StoresToTest storeToTest;
private final boolean cache;
private final boolean log;
@@ -244,6 +249,11 @@ public class IQv2StoreIntegrationTest {
return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
false);
}
+
+ @Override
+ public boolean isWindowed() {
+ return true;
+ }
},
ROCKS_WINDOW {
@Override
@@ -251,6 +261,16 @@ public class IQv2StoreIntegrationTest {
return Stores.persistentWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE,
false);
}
+
+ @Override
+ public boolean isWindowed() {
+ return true;
+ }
+
+ @Override
+ public boolean timestamped() {
+ return false;
+ }
},
TIME_ROCKS_WINDOW {
@Override
@@ -258,18 +278,33 @@ public class IQv2StoreIntegrationTest {
return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofDays(1),
WINDOW_SIZE, false);
}
+
+ @Override
+ public boolean isWindowed() {
+ return true;
+ }
},
IN_MEMORY_SESSION {
@Override
public StoreSupplier<?> supplier() {
return Stores.inMemorySessionStore(STORE_NAME, Duration.ofDays(1));
}
+
+ @Override
+ public boolean isSession() {
+ return true;
+ }
},
ROCKS_SESSION {
@Override
public StoreSupplier<?> supplier() {
return Stores.persistentSessionStore(STORE_NAME, Duration.ofDays(1));
}
+
+ @Override
+ public boolean isSession() {
+ return true;
+ }
};
public abstract StoreSupplier<?> supplier();
@@ -285,6 +320,14 @@ public class IQv2StoreIntegrationTest {
public boolean keyValue() {
return false;
}
+
+ public boolean isWindowed() {
+ return false;
+ }
+
+ public boolean isSession() {
+ return false;
+ }
}
@Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}")
@@ -329,7 +372,7 @@ public class IQv2StoreIntegrationTest {
new ProducerRecord<>(
INPUT_TOPIC_NAME,
i % partitions,
- Time.SYSTEM.milliseconds(),
+ RECORD_TIME,
i,
i,
null
@@ -497,6 +540,25 @@ public class IQv2StoreIntegrationTest {
shouldHandleRangeQueries(valueExtractor);
}
}
+
+ if (storeToTest.isWindowed()) {
+ if (storeToTest.timestamped()) {
+ final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor =
+ ValueAndTimestamp::value;
+ shouldHandleWindowKeyQueries(valueExtractor);
+ shouldHandleWindowRangeQueries(valueExtractor);
+ } else {
+ final Function<Integer, Integer> valueExtractor = Function.identity();
+ shouldHandleWindowKeyQueries(valueExtractor);
+ shouldHandleWindowRangeQueries(valueExtractor);
+ }
+ }
+
+ if (storeToTest.isSession()) {
+ // Note there's no "timestamped" differentiation here.
+ // Idiosyncratically, SessionStores are _never_ timestamped.
+ shouldHandleSessionKeyQueries();
+ }
}
}
@@ -532,6 +594,157 @@ public class IQv2StoreIntegrationTest {
);
}
+ private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> extractor) {
+
+ final long windowSize = WINDOW_SIZE.toMillis();
+ final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
+ // tightest possible start range
+ shouldHandleWindowKeyQuery(
+ 2,
+ Instant.ofEpochMilli(windowStart),
+ Instant.ofEpochMilli(windowStart),
+ extractor,
+ mkSet(2)
+ );
+
+ // miss the window start range
+ shouldHandleWindowKeyQuery(
+ 2,
+ Instant.ofEpochMilli(windowStart - 1),
+ Instant.ofEpochMilli(windowStart - 1),
+ extractor,
+ mkSet()
+ );
+
+ // miss the key
+ shouldHandleWindowKeyQuery(
+ 999,
+ Instant.ofEpochMilli(windowStart),
+ Instant.ofEpochMilli(windowStart),
+ extractor,
+ mkSet()
+ );
+
+ // miss both
+ shouldHandleWindowKeyQuery(
+ 999,
+ Instant.ofEpochMilli(windowStart - 1),
+ Instant.ofEpochMilli(windowStart - 1),
+ extractor,
+ mkSet()
+ );
+ }
+
+ private <T> void shouldHandleWindowRangeQueries(final Function<T, Integer> extractor) {
+ final long windowSize = WINDOW_SIZE.toMillis();
+ final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart),
+ Instant.ofEpochMilli(windowStart),
+ extractor,
+ mkSet(0, 1, 2, 3)
+ );
+
+ // miss the window start
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart - 1),
+ Instant.ofEpochMilli(windowStart - 1),
+ extractor,
+ mkSet()
+ );
+
+ // Should fail to execute this query on a WindowStore.
+ final WindowRangeQuery<Integer, T> query = WindowRangeQuery.withKey(2);
+
+ final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>> request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(mkSet(0, 1))
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+ final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result =
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+ if (result.getGlobalResult() != null) {
+ fail("global tables aren't implemented");
+ } else {
+ final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, T>>> queryResult =
+ result.getPartitionResults();
+ for (final int partition : queryResult.keySet()) {
+ final QueryResult<KeyValueIterator<Windowed<Integer>, T>> partitionResult =
+ queryResult.get(partition);
+ final boolean failure = partitionResult.isFailure();
+ if (!failure) {
+ throw new AssertionError(queryResult.toString());
+ }
+ assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
+ assertThat(partitionResult.getFailureMessage(), is(
+ "This store"
+ + " (class org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore)"
+ + " doesn't know how to execute the given query"
+ + " (WindowRangeQuery{key=Optional[2], timeFrom=Optional.empty, timeTo=Optional.empty})"
+ + " because WindowStores only supports WindowRangeQuery.withWindowStartRange."
+ + " Contact the store maintainer if you need support for a new query type."
+ ));
+ }
+ }
+ }
+
+ private <T> void shouldHandleSessionKeyQueries() {
+ shouldHandleSessionRangeQuery(
+ 2,
+ mkSet(2)
+ );
+
+ // not preset, so empty result iter
+ shouldHandleSessionRangeQuery(
+ 999,
+ mkSet()
+ );
+
+ // Should fail to execute this query on a SessionStore.
+ final WindowRangeQuery<Integer, T> query =
+ WindowRangeQuery.withWindowStartRange(
+ Instant.ofEpochMilli(0L),
+ Instant.ofEpochMilli(0L)
+ );
+
+ final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>> request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(mkSet(0, 1))
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+ final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result =
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+ if (result.getGlobalResult() != null) {
+ fail("global tables aren't implemented");
+ } else {
+ final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, T>>> queryResult =
+ result.getPartitionResults();
+ for (final int partition : queryResult.keySet()) {
+ final QueryResult<KeyValueIterator<Windowed<Integer>, T>> partitionResult =
+ queryResult.get(partition);
+ final boolean failure = partitionResult.isFailure();
+ if (!failure) {
+ throw new AssertionError(queryResult.toString());
+ }
+ assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
+ assertThat(partitionResult.getFailureMessage(), is(
+ "This store"
+ + " (class org.apache.kafka.streams.state.internals.MeteredSessionStore)"
+ + " doesn't know how to execute the given query"
+ + " (WindowRangeQuery{key=Optional.empty, timeFrom=Optional[1970-01-01T00:00:00Z], timeTo=Optional[1970-01-01T00:00:00Z]})"
+ + " because SessionStores only support WindowRangeQuery.withKey."
+ + " Contact the store maintainer if you need support for a new query type."
+ ));
+ }
+ }
+ }
+
private void globalShouldRejectAllQueries() {
// See KAFKA-13523
@@ -581,6 +794,41 @@ public class IQv2StoreIntegrationTest {
);
}
+ public <V> void shouldHandleKeyQuery(
+ final Integer key,
+ final Function<V, Integer> valueExtactor,
+ final Integer expectedValue) {
+
+ final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
+ final StateQueryRequest<V> request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(mkSet(0, 1))
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+ final StateQueryResult<V> result =
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+ final QueryResult<V> queryResult = result.getOnlyPartitionResult();
+ final boolean failure = queryResult.isFailure();
+ if (failure) {
+ throw new AssertionError(queryResult.toString());
+ }
+ assertThat(queryResult.isSuccess(), is(true));
+
+ assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
+ assertThrows(
+ IllegalArgumentException.class,
+ queryResult::getFailureMessage
+ );
+
+ final V result1 = queryResult.getResult();
+ final Integer integer = valueExtactor.apply(result1);
+ assertThat(integer, is(expectedValue));
+
+ assertThat(queryResult.getExecutionInfo(), is(empty()));
+ }
+
public <V> void shouldHandleRangeQuery(
final Optional<Integer> lower,
final Optional<Integer> upper,
@@ -627,10 +875,10 @@ public class IQv2StoreIntegrationTest {
queryResult.get(partition)::getFailureMessage
);
- final KeyValueIterator<Integer, V> iterator = queryResult.get(partition)
- .getResult();
- while (iterator.hasNext()) {
- actualValue.add(valueExtactor.apply(iterator.next().value));
+ try (final KeyValueIterator<Integer, V> iterator = queryResult.get(partition).getResult()) {
+ while (iterator.hasNext()) {
+ actualValue.add(valueExtactor.apply(iterator.next().value));
+ }
}
assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
}
@@ -638,39 +886,153 @@ public class IQv2StoreIntegrationTest {
}
}
- public <V> void shouldHandleKeyQuery(
+ public <V> void shouldHandleWindowKeyQuery(
final Integer key,
+ final Instant timeFrom,
+ final Instant timeTo,
final Function<V, Integer> valueExtactor,
- final Integer expectedValue) {
+ final Set<Integer> expectedValue) {
- final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
- final StateQueryRequest<V> request =
+ final WindowKeyQuery<Integer, V> query = WindowKeyQuery.withKeyAndWindowStartRange(
+ key,
+ timeFrom,
+ timeTo
+ );
+
+ final StateQueryRequest<WindowStoreIterator<V>> request =
inStore(STORE_NAME)
.withQuery(query)
.withPartitions(mkSet(0, 1))
.withPositionBound(PositionBound.at(INPUT_POSITION));
- final StateQueryResult<V> result =
+ final StateQueryResult<WindowStoreIterator<V>> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
- final QueryResult<V> queryResult = result.getOnlyPartitionResult();
- final boolean failure = queryResult.isFailure();
- if (failure) {
- throw new AssertionError(queryResult.toString());
+ if (result.getGlobalResult() != null) {
+ fail("global tables aren't implemented");
+ } else {
+ final Set<Integer> actualValue = new HashSet<>();
+ final Map<Integer, QueryResult<WindowStoreIterator<V>>> queryResult = result.getPartitionResults();
+ for (final int partition : queryResult.keySet()) {
+ final boolean failure = queryResult.get(partition).isFailure();
+ if (failure) {
+ throw new AssertionError(queryResult.toString());
+ }
+ assertThat(queryResult.get(partition).isSuccess(), is(true));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ queryResult.get(partition)::getFailureReason
+ );
+ assertThrows(
+ IllegalArgumentException.class,
+ queryResult.get(partition)::getFailureMessage
+ );
+
+ try (final WindowStoreIterator<V> iterator = queryResult.get(partition).getResult()) {
+ while (iterator.hasNext()) {
+ actualValue.add(valueExtactor.apply(iterator.next().value));
+ }
+ }
+ assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
+ }
+ assertThat(actualValue, is(expectedValue));
}
- assertThat(queryResult.isSuccess(), is(true));
+ }
- assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
- assertThrows(
- IllegalArgumentException.class,
- queryResult::getFailureMessage
- );
+ public <V> void shouldHandleWindowRangeQuery(
+ final Instant timeFrom,
+ final Instant timeTo,
+ final Function<V, Integer> valueExtactor,
+ final Set<Integer> expectedValue) {
- final V result1 = queryResult.getResult();
- final Integer integer = valueExtactor.apply(result1);
- assertThat(integer, is(expectedValue));
+ final WindowRangeQuery<Integer, V> query = WindowRangeQuery.withWindowStartRange(timeFrom, timeTo);
- assertThat(queryResult.getExecutionInfo(), is(empty()));
+ final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>> request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(mkSet(0, 1))
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+ final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result =
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+ if (result.getGlobalResult() != null) {
+ fail("global tables aren't implemented");
+ } else {
+ final Set<Integer> actualValue = new HashSet<>();
+ final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, V>>> queryResult = result.getPartitionResults();
+ for (final int partition : queryResult.keySet()) {
+ final boolean failure = queryResult.get(partition).isFailure();
+ if (failure) {
+ throw new AssertionError(queryResult.toString());
+ }
+ assertThat(queryResult.get(partition).isSuccess(), is(true));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ queryResult.get(partition)::getFailureReason
+ );
+ assertThrows(
+ IllegalArgumentException.class,
+ queryResult.get(partition)::getFailureMessage
+ );
+
+ try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) {
+ while (iterator.hasNext()) {
+ actualValue.add(valueExtactor.apply(iterator.next().value));
+ }
+ }
+ assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
+ }
+ assertThat(actualValue, is(expectedValue));
+ }
+ }
+
+ public <V> void shouldHandleSessionRangeQuery(
+ final Integer key,
+ final Set<Integer> expectedValue) {
+
+ final WindowRangeQuery<Integer, V> query = WindowRangeQuery.withKey(key);
+
+ final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>> request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(mkSet(0, 1))
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
+ final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result =
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+ if (result.getGlobalResult() != null) {
+ fail("global tables aren't implemented");
+ } else {
+ final Set<Integer> actualValue = new HashSet<>();
+ final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, V>>> queryResult = result.getPartitionResults();
+ for (final int partition : queryResult.keySet()) {
+ final boolean failure = queryResult.get(partition).isFailure();
+ if (failure) {
+ throw new AssertionError(queryResult.toString());
+ }
+ assertThat(queryResult.get(partition).isSuccess(), is(true));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ queryResult.get(partition)::getFailureReason
+ );
+ assertThrows(
+ IllegalArgumentException.class,
+ queryResult.get(partition)::getFailureMessage
+ );
+
+ try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) {
+ while (iterator.hasNext()) {
+ actualValue.add((Integer) iterator.next().value);
+ }
+ }
+ assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
+ }
+ assertThat(actualValue, is(expectedValue));
+ }
}
public void shouldCollectExecutionInfo() {