You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/11/16 01:35:01 UTC
(kafka) branch trunk updated: KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) (#14596)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0489b7cd331 KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) (#14596)
0489b7cd331 is described below
commit 0489b7cd331b22b5a2912e79e1cfa517ba6cecc9
Author: Alieh <10...@users.noreply.github.com>
AuthorDate: Thu Nov 16 02:34:54 2023 +0100
KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) (#14596)
This PR implements KIP-960 which add support for `VersionedKeyQuery`.
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../kafka/streams/query/VersionedKeyQuery.java | 87 +++++++++++
.../internals/MeteredVersionedKeyValueStore.java | 80 ++++++++++
.../state/internals/RocksDBVersionedStore.java | 27 +++-
.../streams/state/internals/StoreQueryUtils.java | 47 ++++++
.../IQv2VersionedStoreIntegrationTest.java | 166 +++++++++++++++++++++
.../kafka/streams/query/VersionedKeyQueryTest.java | 38 +++++
6 files changed, 444 insertions(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java
new file mode 100644
index 00000000000..869b74ba0cf
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a single record from a versioned state store based on its key and timestamp.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+@Evolving
+public final class VersionedKeyQuery<K, V> implements Query<VersionedRecord<V>> {
+
+ private final K key;
+ private final Optional<Instant> asOfTimestamp;
+
+ private VersionedKeyQuery(final K key, final Optional<Instant> asOfTimestamp) {
+ this.key = key;
+ this.asOfTimestamp = asOfTimestamp;
+ }
+
+ /**
+ * Creates a query that will retrieve the record from a versioned state store identified by {@code key} if it exists
+ * (or {@code null} otherwise).
+ * <p>
+ * While the query by default returns the latest value of the specified {@code key}, setting
+ * the {@code asOfTimestamp} (by calling the {@link #asOf(Instant)} method), makes the query
+ * to return the value associated to the specified {@code asOfTimestamp}.
+ *
+ * @param key The key to retrieve
+ * @param <K> The type of the key
+ * @param <V> The type of the value that will be retrieved
+ * @throws NullPointerException if {@code key} is null
+ */
+ public static <K, V> VersionedKeyQuery<K, V> withKey(final K key) {
+ Objects.requireNonNull(key, "key cannot be null.");
+ return new VersionedKeyQuery<>(key, Optional.empty());
+ }
+
+ /**
+ * Specifies the timestamp for the key query. The key query returns the record's version for the specified timestamp.
+ * (To be more precise: The key query returns the record with the greatest timestamp <= asOfTimestamp)
+ *
+ * @param asOfTimestamp The timestamp of the query.
+ * @throws NullPointerException if {@code asOfTimestamp} is null
+ */
+ public VersionedKeyQuery<K, V> asOf(final Instant asOfTimestamp) {
+ Objects.requireNonNull(asOfTimestamp, "asOf timestamp cannot be null.");
+ return new VersionedKeyQuery<>(key, Optional.of(asOfTimestamp));
+ }
+
+ /**
+ * The key that was specified for this query.
+ * @return The specified {@code key} of the query.
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * The timestamp of the query, if specified.
+ * @return The specified {@code asOfTimestamp} of the query.
+ */
+ public Optional<Instant> asOfTimestamp() {
+ return asOfTimestamp;
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
index 8260f1a0bff..d3a054a34e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
@@ -16,12 +16,17 @@
*/
package org.apache.kafka.streams.state.internals;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerde;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -29,11 +34,15 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.VersionedKeyQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@@ -41,6 +50,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.VersionedBytesStore;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
/**
* A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
@@ -89,6 +99,22 @@ public class MeteredVersionedKeyValueStore<K, V>
private final Serde<V> plainValueSerde;
private StateSerdes<K, V> plainValueSerdes;
+ private final Map<Class, QueryHandler> queryHandlers =
+ mkMap(
+ mkEntry(
+ RangeQuery.class,
+ (query, positionBound, config, store) -> runRangeQuery(query, positionBound, config)
+ ),
+ mkEntry(
+ KeyQuery.class,
+ (query, positionBound, config, store) -> runKeyQuery(query, positionBound, config)
+ ),
+ mkEntry(
+ VersionedKeyQuery.class,
+ (query, positionBound, config, store) -> runVersionedKeyQuery(query, positionBound, config)
+ )
+ );
+
MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
final String metricScope,
final Time time,
@@ -139,6 +165,36 @@ public class MeteredVersionedKeyValueStore<K, V>
}
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound, final QueryConfig config) {
+
+ final long start = time.nanoseconds();
+ final QueryResult<R> result;
+
+ final QueryHandler handler = queryHandlers.get(query.getClass());
+ if (handler == null) {
+ result = wrapped().query(query, positionBound, config);
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
+ }
+ } else {
+ result = (QueryResult<R>) handler.apply(
+ query,
+ positionBound,
+ config,
+ this
+ );
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " with serdes "
+ + serdes + " in " + (time.nanoseconds() - start) + "ns");
+ }
+ }
+ return result;
+ }
+
@Override
protected <R> QueryResult<R> runRangeQuery(final Query<R> query,
final PositionBound positionBound,
@@ -157,6 +213,30 @@ public class MeteredVersionedKeyValueStore<K, V>
throw new UnsupportedOperationException("Versioned stores do not support KeyQuery queries at this time.");
}
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runVersionedKeyQuery(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final QueryResult<R> result;
+ final VersionedKeyQuery<K, V> typedKeyQuery = (VersionedKeyQuery<K, V>) query;
+ VersionedKeyQuery<Bytes, byte[]> rawKeyQuery = VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+ if (typedKeyQuery.asOfTimestamp().isPresent()) {
+ rawKeyQuery = rawKeyQuery.asOf(typedKeyQuery.asOfTimestamp().get());
+ }
+ final QueryResult<VersionedRecord<byte[]>> rawResult =
+ wrapped().query(rawKeyQuery, positionBound, config);
+ if (rawResult.isSuccess() && rawResult.getResult() != null) {
+ final VersionedRecord<V> versionedRecord = StoreQueryUtils.deserializeVersionedRecord(plainValueSerdes, rawResult.getResult());
+ final QueryResult<VersionedRecord<V>> typedQueryResult =
+ InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, versionedRecord);
+ result = (QueryResult<R>) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no result set.
+ result = (QueryResult<R>) rawResult;
+ }
+ return result;
+ }
+
@SuppressWarnings("unchecked")
@Override
protected Serde<ValueAndTimestamp<V>> prepareValueSerdeForStore(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
index e33988b971c..c67d9a2fb5e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
@@ -41,6 +41,10 @@ import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapt
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
@@ -128,13 +132,17 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
}
observedStreamTime = Math.max(observedStreamTime, timestamp);
- return doPut(
+ final long foundTs = doPut(
versionedStoreClient,
observedStreamTime,
key,
value,
timestamp
);
+
+ StoreQueryUtils.updatePosition(position, stateStoreContext);
+
+ return foundTs;
}
@Override
@@ -159,6 +167,8 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
timestamp
);
+ StoreQueryUtils.updatePosition(position, stateStoreContext);
+
return existingRecord;
}
@@ -274,6 +284,21 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
// same physical RocksDB instance
}
+ @Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ return StoreQueryUtils.handleBasicQueries(
+ query,
+ positionBound,
+ config,
+ this,
+ position,
+ stateStoreContext
+ );
+ }
+
@Override
public boolean persistent() {
return true;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index f8d9df6e90f..3278245f11b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -33,12 +33,15 @@ import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.VersionedKeyQuery;
import org.apache.kafka.streams.query.WindowKeyQuery;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -90,6 +93,10 @@ public final class StoreQueryUtils {
mkEntry(
WindowRangeQuery.class,
StoreQueryUtils::runWindowRangeQuery
+ ),
+ mkEntry(
+ VersionedKeyQuery.class,
+ StoreQueryUtils::runVersionedKeyQuery
)
);
@@ -335,6 +342,38 @@ public final class StoreQueryUtils {
}
}
+ @SuppressWarnings("unchecked")
+ private static <R> QueryResult<R> runVersionedKeyQuery(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config,
+ final StateStore store) {
+ if (store instanceof VersionedKeyValueStore) {
+ final VersionedKeyValueStore<Bytes, byte[]> versionedKeyValueStore =
+ (VersionedKeyValueStore<Bytes, byte[]>) store;
+ final VersionedKeyQuery<Bytes, byte[]> rawKeyQuery =
+ (VersionedKeyQuery<Bytes, byte[]>) query;
+ try {
+ final VersionedRecord<byte[]> bytes;
+ if (((VersionedKeyQuery<?, ?>) query).asOfTimestamp().isPresent()) {
+ bytes = versionedKeyValueStore.get(rawKeyQuery.key(),
+ ((VersionedKeyQuery<?, ?>) query).asOfTimestamp().get().toEpochMilli());
+ } else {
+ bytes = versionedKeyValueStore.get(rawKeyQuery.key());
+ }
+ return (QueryResult<R>) QueryResult.forResult(bytes);
+ } catch (final Exception e) {
+ final String message = parseStoreException(e, store, query);
+ return QueryResult.forFailure(
+ FailureReason.STORE_EXCEPTION,
+ message
+ );
+ }
+
+ } else {
+ return QueryResult.forUnknownQueryType(query, store);
+ }
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
public static <V> Function<byte[], V> getDeserializeValue(final StateSerdes<?, V> serdes,
final StateStore wrapped) {
@@ -351,6 +390,14 @@ public final class StoreQueryUtils {
return byteArray -> deserializer.deserialize(serdes.topic(), byteArray);
}
+ public static <V> VersionedRecord<V> deserializeVersionedRecord(final StateSerdes<?, V> serdes,
+ final VersionedRecord<byte[]> rawVersionedRecord) {
+ final Deserializer<V> valueDeserializer = serdes.valueDeserializer();
+ final long timestamp = rawVersionedRecord.timestamp();
+ final V value = valueDeserializer.deserialize(serdes.topic(), rawVersionedRecord.value());
+ return new VersionedRecord<>(value, timestamp);
+ }
+
public static void checkpointPosition(final OffsetCheckpoint checkpointFile,
final Position position) {
try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
new file mode 100644
index 00000000000..339a62dd74b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.query.VersionedKeyQuery;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({IntegrationTest.class})
+public class IQv2VersionedStoreIntegrationTest {
+ private static final int NUM_BROKERS = 1;
+ private static final String INPUT_TOPIC_NAME = "input-topic";
+ private static final String STORE_NAME = "versioned-store";
+ private static final Duration HISTORY_RETENTION = Duration.ofDays(1);
+ private static final Instant BASE_TIMESTAMP = Instant.parse("2023-01-01T10:00:00.00Z");
+ private static final Long RECORD_TIMESTAMP_OLD = BASE_TIMESTAMP.toEpochMilli();
+ private static final Long RECORD_TIMESTAMP_NEW = RECORD_TIMESTAMP_OLD + 100;
+ private static final int RECORD_KEY = 2;
+ private static final int RECORD_VALUE_OLD = 2;
+ private static final int RECORD_VALUE_NEW = 3;
+ private static final Position INPUT_POSITION = Position.emptyPosition();
+
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS,
+ Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true")));
+ private KafkaStreams kafkaStreams;
+
+ @BeforeClass
+ public static void before() throws Exception {
+ CLUSTER.start();
+ final Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ try (final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) {
+ producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMP_OLD, RECORD_KEY, RECORD_VALUE_OLD)).get();
+ producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMP_NEW, RECORD_KEY, RECORD_VALUE_NEW)).get();
+ }
+ INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 1);
+ }
+
+ @Before
+ public void beforeTest() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.table(INPUT_TOPIC_NAME,
+ Materialized.as(Stores.persistentVersionedKeyValueStore(STORE_NAME, HISTORY_RETENTION)));
+ final Properties configs = new Properties();
+ configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
+ configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
+ configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
+ kafkaStreams = IntegrationTestUtils.getStartedStreams(configs, builder, true);
+ }
+
+ @After
+ public void afterTest() {
+ if (kafkaStreams != null) {
+ kafkaStreams.close();
+ kafkaStreams.cleanUp();
+ }
+ }
+
+ @AfterClass
+ public static void after() {
+ CLUSTER.stop();
+ }
+
+ @Test
+ public void verifyStore() {
+ // retrieve the latest value
+ shouldHandleVersionedKeyQuery(RECORD_KEY, Optional.empty(), RECORD_VALUE_NEW, RECORD_TIMESTAMP_NEW);
+ shouldHandleVersionedKeyQuery(RECORD_KEY, Optional.of(Instant.now()), RECORD_VALUE_NEW, RECORD_TIMESTAMP_NEW);
+ shouldHandleVersionedKeyQuery(RECORD_KEY, Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMP_NEW)), RECORD_VALUE_NEW, RECORD_TIMESTAMP_NEW);
+ // retrieve the old value
+ shouldHandleVersionedKeyQuery(RECORD_KEY, Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMP_OLD)), RECORD_VALUE_OLD, RECORD_TIMESTAMP_OLD);
+ // there is no record for the provided timestamp
+ shouldVerifyGetNull(RECORD_KEY, Instant.ofEpochMilli(RECORD_TIMESTAMP_OLD - 50));
+ // there is no record with this key
+ shouldVerifyGetNull(3, Instant.now());
+ }
+
+ private void shouldHandleVersionedKeyQuery(final Integer key,
+ final Optional<Instant> queryTimestamp,
+ final Integer expectedValue,
+ final Long expectedTimestamp) {
+
+ VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKey(key);
+ if (queryTimestamp.isPresent()) {
+ query = query.asOf(queryTimestamp.get());
+ }
+ final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+ final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+ if (result.getOnlyPartitionResult() == null) {
+ throw new AssertionError("The query returned null.");
+ }
+ final QueryResult<VersionedRecord<Integer>> queryResult = result.getOnlyPartitionResult();
+ if (queryResult.isFailure()) {
+ throw new AssertionError(queryResult.toString());
+ }
+ if (queryResult.getResult() == null) {
+ throw new AssertionError("The query returned null.");
+ }
+
+ assertThat(queryResult.isSuccess(), is(true));
+ final VersionedRecord<Integer> result1 = queryResult.getResult();
+ assertThat(result1.value(), is(expectedValue));
+ assertThat(result1.timestamp(), is(expectedTimestamp));
+ assertThat(queryResult.getExecutionInfo(), is(empty()));
+ }
+
+ private void shouldVerifyGetNull(final Integer key, final Instant queryTimestamp) {
+ VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKey(key);
+ query = query.asOf(queryTimestamp);
+ final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+ final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+ assertThat(result.getOnlyPartitionResult(), nullValue());
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/query/VersionedKeyQueryTest.java b/streams/src/test/java/org/apache/kafka/streams/query/VersionedKeyQueryTest.java
new file mode 100644
index 00000000000..5f07d3cc70d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/query/VersionedKeyQueryTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.junit.Test;
+
+public class VersionedKeyQueryTest {
+ @Test
+ public void shouldThrowNPEWithNullKey() {
+ final Exception exception = assertThrows(NullPointerException.class, () -> VersionedKeyQuery.withKey(null));
+ assertEquals("key cannot be null.", exception.getMessage());
+ }
+
+ @Test
+ public void shouldThrowNPEWithNullAsOftimestamp() {
+ final VersionedKeyQuery<Integer, VersionedRecord<Integer>> query = VersionedKeyQuery.withKey(1);
+ final Exception exception = assertThrows(NullPointerException.class, () -> query.asOf(null));
+ assertEquals("asOf timestamp cannot be null.", exception.getMessage());
+ }
+}