You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/11/16 01:35:01 UTC

(kafka) branch trunk updated: KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) (#14596)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0489b7cd331 KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) (#14596)
0489b7cd331 is described below

commit 0489b7cd331b22b5a2912e79e1cfa517ba6cecc9
Author: Alieh <10...@users.noreply.github.com>
AuthorDate: Thu Nov 16 02:34:54 2023 +0100

    KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) (#14596)
    
    This PR implements KIP-960 which add support for `VersionedKeyQuery`.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../kafka/streams/query/VersionedKeyQuery.java     |  87 +++++++++++
 .../internals/MeteredVersionedKeyValueStore.java   |  80 ++++++++++
 .../state/internals/RocksDBVersionedStore.java     |  27 +++-
 .../streams/state/internals/StoreQueryUtils.java   |  47 ++++++
 .../IQv2VersionedStoreIntegrationTest.java         | 166 +++++++++++++++++++++
 .../kafka/streams/query/VersionedKeyQueryTest.java |  38 +++++
 6 files changed, 444 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java
new file mode 100644
index 00000000000..869b74ba0cf
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/VersionedKeyQuery.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a single record from a versioned state store based on its key and timestamp.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+@Evolving
+public final class VersionedKeyQuery<K, V> implements Query<VersionedRecord<V>> {
+
+    private final K key;
+    private final Optional<Instant> asOfTimestamp;
+
+    private VersionedKeyQuery(final K key, final Optional<Instant> asOfTimestamp) {
+        this.key = key;
+        this.asOfTimestamp = asOfTimestamp;
+    }
+
+    /**
+     * Creates a query that will retrieve the record from a versioned state store identified by {@code key} if it exists
+     * (or {@code null} otherwise).
+     * <p>
+     * While the query by default returns the latest value of the specified {@code key}, setting
+     * the {@code asOfTimestamp} (by calling the {@link #asOf(Instant)} method), makes the query
+     * to return the value associated to the specified {@code asOfTimestamp}.
+     *
+     * @param key The key to retrieve
+     * @param <K> The type of the key
+     * @param <V> The type of the value that will be retrieved
+     * @throws NullPointerException if {@code key} is null
+     */
+    public static <K, V> VersionedKeyQuery<K, V> withKey(final K key) {
+        Objects.requireNonNull(key, "key cannot be null.");
+        return new VersionedKeyQuery<>(key, Optional.empty());
+    }
+
+    /**
+     * Specifies the timestamp for the key query. The key query returns the record's version for the specified timestamp.
+     * (To be more precise: The key query returns the record with the greatest timestamp &lt;= asOfTimestamp)
+     *
+     * @param asOfTimestamp The timestamp of the query.
+     * @throws NullPointerException if {@code asOfTimestamp} is null
+     */
+    public VersionedKeyQuery<K, V> asOf(final Instant asOfTimestamp) {
+        Objects.requireNonNull(asOfTimestamp, "asOf timestamp cannot be null.");
+        return new VersionedKeyQuery<>(key, Optional.of(asOfTimestamp));
+    }
+
+    /**
+     * The key that was specified for this query.
+     * @return The specified {@code key} of the query.
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * The timestamp of the query, if specified.
+     * @return The specified {@code asOfTimestamp} of the query.
+     */
+    public Optional<Instant> asOfTimestamp() {
+        return asOfTimestamp;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
index 8260f1a0bff..d3a054a34e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
@@ -16,12 +16,17 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
 import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerde;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
+
+import java.util.Map;
 import java.util.Objects;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -29,11 +34,15 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryConfig;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.VersionedKeyQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@@ -41,6 +50,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.VersionedBytesStore;
 import org.apache.kafka.streams.state.VersionedKeyValueStore;
 import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
 
 /**
  * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
@@ -89,6 +99,22 @@ public class MeteredVersionedKeyValueStore<K, V>
         private final Serde<V> plainValueSerde;
         private StateSerdes<K, V> plainValueSerdes;
 
+        private final Map<Class, QueryHandler> queryHandlers =
+            mkMap(
+                mkEntry(
+                    RangeQuery.class,
+                    (query, positionBound, config, store) -> runRangeQuery(query, positionBound, config)
+                ),
+                mkEntry(
+                    KeyQuery.class,
+                    (query, positionBound, config, store) -> runKeyQuery(query, positionBound, config)
+                ),
+                mkEntry(
+                    VersionedKeyQuery.class,
+                    (query, positionBound, config, store) -> runVersionedKeyQuery(query, positionBound, config)
+                )
+            );
+
         MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
                                               final String metricScope,
                                               final Time time,
@@ -139,6 +165,36 @@ public class MeteredVersionedKeyValueStore<K, V>
             }
         }
 
+        @SuppressWarnings("unchecked")
+        @Override
+        public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound, final QueryConfig config) {
+
+            final long start = time.nanoseconds();
+            final QueryResult<R> result;
+
+            final QueryHandler handler = queryHandlers.get(query.getClass());
+            if (handler == null) {
+                result = wrapped().query(query, positionBound, config);
+                if (config.isCollectExecutionInfo()) {
+                    result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
+                }
+            } else {
+                result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+                );
+                if (config.isCollectExecutionInfo()) {
+                    result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                            + serdes + " in " + (time.nanoseconds() - start) + "ns");
+                }
+            }
+            return result;
+        }
+
         @Override
         protected <R> QueryResult<R> runRangeQuery(final Query<R> query,
                                                    final PositionBound positionBound,
@@ -157,6 +213,30 @@ public class MeteredVersionedKeyValueStore<K, V>
             throw new UnsupportedOperationException("Versioned stores do not support KeyQuery queries at this time.");
         }
 
+        @SuppressWarnings("unchecked")
+        private <R> QueryResult<R> runVersionedKeyQuery(final Query<R> query,
+                                                          final PositionBound positionBound,
+                                                          final QueryConfig config) {
+            final QueryResult<R> result;
+            final VersionedKeyQuery<K, V> typedKeyQuery = (VersionedKeyQuery<K, V>) query;
+            VersionedKeyQuery<Bytes, byte[]> rawKeyQuery = VersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+            if (typedKeyQuery.asOfTimestamp().isPresent()) {
+                rawKeyQuery = rawKeyQuery.asOf(typedKeyQuery.asOfTimestamp().get());
+            }
+            final QueryResult<VersionedRecord<byte[]>> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+            if (rawResult.isSuccess() && rawResult.getResult() != null) {
+                final VersionedRecord<V> versionedRecord = StoreQueryUtils.deserializeVersionedRecord(plainValueSerdes, rawResult.getResult());
+                final QueryResult<VersionedRecord<V>> typedQueryResult =
+                    InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, versionedRecord);
+                result = (QueryResult<R>) typedQueryResult;
+            } else {
+                // the generic type doesn't matter, since failed queries have no result set.
+                result = (QueryResult<R>) rawResult;
+            }
+            return result;
+        }
+
         @SuppressWarnings("unchecked")
         @Override
         protected Serde<ValueAndTimestamp<V>> prepareValueSerdeForStore(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
index e33988b971c..c67d9a2fb5e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
@@ -41,6 +41,10 @@ import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapt
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.VersionedKeyValueStore;
 import org.apache.kafka.streams.state.VersionedRecord;
 import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
@@ -128,13 +132,17 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
         }
         observedStreamTime = Math.max(observedStreamTime, timestamp);
 
-        return doPut(
+        final long foundTs = doPut(
             versionedStoreClient,
             observedStreamTime,
             key,
             value,
             timestamp
         );
+
+        StoreQueryUtils.updatePosition(position, stateStoreContext);
+
+        return foundTs;
     }
 
     @Override
@@ -159,6 +167,8 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
             timestamp
         );
 
+        StoreQueryUtils.updatePosition(position, stateStoreContext);
+
         return existingRecord;
     }
 
@@ -274,6 +284,21 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
         // same physical RocksDB instance
     }
 
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config) {
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            config,
+            this,
+            position,
+            stateStoreContext
+        );
+    }
+
     @Override
     public boolean persistent() {
         return true;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index f8d9df6e90f..3278245f11b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -33,12 +33,15 @@ import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryConfig;
 import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.VersionedKeyQuery;
 import org.apache.kafka.streams.query.WindowKeyQuery;
 import org.apache.kafka.streams.query.WindowRangeQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
@@ -90,6 +93,10 @@ public final class StoreQueryUtils {
             mkEntry(
                 WindowRangeQuery.class,
                 StoreQueryUtils::runWindowRangeQuery
+            ),
+            mkEntry(
+                VersionedKeyQuery.class,
+                StoreQueryUtils::runVersionedKeyQuery
             )
         );
 
@@ -335,6 +342,38 @@ public final class StoreQueryUtils {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runVersionedKeyQuery(final Query<R> query,
+                                                           final PositionBound positionBound,
+                                                           final QueryConfig config,
+                                                           final StateStore store) {
+        if (store instanceof VersionedKeyValueStore) {
+            final VersionedKeyValueStore<Bytes, byte[]> versionedKeyValueStore =
+                (VersionedKeyValueStore<Bytes, byte[]>) store;
+            final VersionedKeyQuery<Bytes, byte[]> rawKeyQuery =
+                (VersionedKeyQuery<Bytes, byte[]>) query;
+            try {
+                final VersionedRecord<byte[]> bytes;
+                if (((VersionedKeyQuery<?, ?>) query).asOfTimestamp().isPresent()) {
+                    bytes = versionedKeyValueStore.get(rawKeyQuery.key(),
+                        ((VersionedKeyQuery<?, ?>) query).asOfTimestamp().get().toEpochMilli());
+                } else {
+                    bytes = versionedKeyValueStore.get(rawKeyQuery.key());
+                }
+                return (QueryResult<R>) QueryResult.forResult(bytes);
+            } catch (final Exception e) {
+                final String message = parseStoreException(e, store, query);
+                return QueryResult.forFailure(
+                    FailureReason.STORE_EXCEPTION,
+                    message
+                );
+            }
+
+        } else {
+            return QueryResult.forUnknownQueryType(query, store);
+        }
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     public static <V> Function<byte[], V> getDeserializeValue(final StateSerdes<?, V> serdes,
                                                               final StateStore wrapped) {
@@ -351,6 +390,14 @@ public final class StoreQueryUtils {
         return byteArray -> deserializer.deserialize(serdes.topic(), byteArray);
     }
 
+    public static <V> VersionedRecord<V> deserializeVersionedRecord(final StateSerdes<?, V> serdes,
+                                                                    final VersionedRecord<byte[]> rawVersionedRecord) {
+        final Deserializer<V> valueDeserializer = serdes.valueDeserializer();
+        final long timestamp = rawVersionedRecord.timestamp();
+        final V value = valueDeserializer.deserialize(serdes.topic(), rawVersionedRecord.value());
+        return new VersionedRecord<>(value, timestamp);
+    }
+
     public static void checkpointPosition(final OffsetCheckpoint checkpointFile,
                                           final Position position) {
         try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
new file mode 100644
index 00000000000..339a62dd74b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.query.VersionedKeyQuery;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({IntegrationTest.class})
+public class IQv2VersionedStoreIntegrationTest {
+    private static final int NUM_BROKERS = 1;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final String STORE_NAME = "versioned-store";
+    private static final Duration HISTORY_RETENTION = Duration.ofDays(1);
+    private static final Instant BASE_TIMESTAMP = Instant.parse("2023-01-01T10:00:00.00Z");
+    private static final Long RECORD_TIMESTAMP_OLD = BASE_TIMESTAMP.toEpochMilli();
+    private static final Long RECORD_TIMESTAMP_NEW = RECORD_TIMESTAMP_OLD + 100;
+    private static final int RECORD_KEY = 2;
+    private static final int RECORD_VALUE_OLD = 2;
+    private static final int RECORD_VALUE_NEW = 3;
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS,
+        Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true")));
+    private KafkaStreams kafkaStreams;
+
+    @BeforeClass
+    public static void before() throws Exception {
+        CLUSTER.start();
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        try (final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) {
+            producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMP_OLD, RECORD_KEY, RECORD_VALUE_OLD)).get();
+            producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMP_NEW, RECORD_KEY, RECORD_VALUE_NEW)).get();
+        }
+        INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 1);
+    }
+
+    @Before
+    public void beforeTest() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(INPUT_TOPIC_NAME,
+            Materialized.as(Stores.persistentVersionedKeyValueStore(STORE_NAME, HISTORY_RETENTION)));
+        final Properties configs = new Properties();
+        configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
+        configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
+        configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
+        kafkaStreams = IntegrationTestUtils.getStartedStreams(configs, builder, true);
+    }
+
+    @After
+    public void afterTest() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+            kafkaStreams.cleanUp();
+        }
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Test
+    public void verifyStore() {
+        // retrieve the latest value
+        shouldHandleVersionedKeyQuery(RECORD_KEY, Optional.empty(), RECORD_VALUE_NEW, RECORD_TIMESTAMP_NEW);
+        shouldHandleVersionedKeyQuery(RECORD_KEY, Optional.of(Instant.now()), RECORD_VALUE_NEW, RECORD_TIMESTAMP_NEW);
+        shouldHandleVersionedKeyQuery(RECORD_KEY, Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMP_NEW)), RECORD_VALUE_NEW, RECORD_TIMESTAMP_NEW);
+        // retrieve the old value
+        shouldHandleVersionedKeyQuery(RECORD_KEY, Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMP_OLD)), RECORD_VALUE_OLD, RECORD_TIMESTAMP_OLD);
+        // there is no record for the provided timestamp
+        shouldVerifyGetNull(RECORD_KEY, Instant.ofEpochMilli(RECORD_TIMESTAMP_OLD - 50));
+        // there is no record with this key
+        shouldVerifyGetNull(3, Instant.now());
+    }
+
+    private void shouldHandleVersionedKeyQuery(final Integer key,
+                                               final Optional<Instant> queryTimestamp,
+                                               final Integer expectedValue,
+                                               final Long expectedTimestamp) {
+
+        VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKey(key);
+        if (queryTimestamp.isPresent()) {
+            query = query.asOf(queryTimestamp.get());
+        }
+        final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        if (result.getOnlyPartitionResult() == null) {
+            throw new AssertionError("The query returned null.");
+        }
+        final QueryResult<VersionedRecord<Integer>> queryResult = result.getOnlyPartitionResult();
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        if (queryResult.getResult() == null) {
+            throw new AssertionError("The query returned null.");
+        }
+
+        assertThat(queryResult.isSuccess(), is(true));
+        final VersionedRecord<Integer> result1 = queryResult.getResult();
+        assertThat(result1.value(), is(expectedValue));
+        assertThat(result1.timestamp(), is(expectedTimestamp));
+        assertThat(queryResult.getExecutionInfo(), is(empty()));
+    }
+
+    private void shouldVerifyGetNull(final Integer key, final Instant queryTimestamp) {
+        VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKey(key);
+        query = query.asOf(queryTimestamp);
+        final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        assertThat(result.getOnlyPartitionResult(), nullValue());
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/query/VersionedKeyQueryTest.java b/streams/src/test/java/org/apache/kafka/streams/query/VersionedKeyQueryTest.java
new file mode 100644
index 00000000000..5f07d3cc70d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/query/VersionedKeyQueryTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.junit.Test;
+
+public class VersionedKeyQueryTest {
+    @Test
+    public void shouldThrowNPEWithNullKey() {
+        final Exception exception = assertThrows(NullPointerException.class, () -> VersionedKeyQuery.withKey(null));
+        assertEquals("key cannot be null.", exception.getMessage());
+    }
+
+    @Test
+    public void shouldThrowNPEWithNullAsOftimestamp() {
+        final VersionedKeyQuery<Integer, VersionedRecord<Integer>> query = VersionedKeyQuery.withKey(1);
+        final Exception exception = assertThrows(NullPointerException.class, () -> query.asOf(null));
+        assertEquals("asOf timestamp cannot be null.", exception.getMessage());
+    }
+}