You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/16 15:16:23 UTC

[kafka] branch trunk updated: KAFAK-3522: Add TopologyTestDriver unit tests (#6179)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 16b4088  KAFAK-3522: Add TopologyTestDriver unit tests (#6179)
16b4088 is described below

commit 16b408898e75b00ddf6b607246833cdbcd56f507
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu May 16 16:16:00 2019 +0100

    KAFAK-3522: Add TopologyTestDriver unit tests (#6179)
    
    Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 .../org/apache/kafka/streams/state/Stores.java     |  41 +-
 .../integration/StoreUpgradeIntegrationTest.java   | 995 +++++++++++++++++++++
 .../kafka/streams/TopologyTestDriverTest.java      | 196 +++-
 3 files changed, 1188 insertions(+), 44 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index e40251d..2f81fd3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -354,26 +354,6 @@ public final class Stores {
     }
 
     /**
-     * Create an in-memory {@link SessionBytesStoreSupplier}.
-     * @param name              name of the store (cannot be {@code null})
-     * @param retentionPeriod   length ot time to retain data in the store (cannot be negative)
-     *                          Note that the retention period must be at least long enough to contain the
-     *                          windowed data's entire life cycle, from window-start through window-end,
-     *                          and for the entire grace period.
-     * @return an instance of a {@link  SessionBytesStoreSupplier}
-     */
-    public static SessionBytesStoreSupplier inMemorySessionStore(final String name, final Duration retentionPeriod) {
-        Objects.requireNonNull(name, "name cannot be null");
-
-        final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
-        final long retentionPeriodMs = ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix);
-        if (retentionPeriodMs < 0) {
-            throw new IllegalArgumentException("retentionPeriod cannot be negative");
-        }
-        return new InMemorySessionBytesStoreSupplier(name, retentionPeriodMs);
-    }
-
-    /**
      * Create a persistent {@link SessionBytesStoreSupplier}.
      *
      * @param name              name of the store (cannot be {@code null})
@@ -412,6 +392,27 @@ public final class Stores {
     }
 
     /**
+     * Create an in-memory {@link SessionBytesStoreSupplier}.
+     *
+     * @param name              name of the store (cannot be {@code null})
+     * @param retentionPeriod   length ot time to retain data in the store (cannot be negative)
+     *                          Note that the retention period must be at least long enough to contain the
+     *                          windowed data's entire life cycle, from window-start through window-end,
+     *                          and for the entire grace period.
+     * @return an instance of a {@link  SessionBytesStoreSupplier}
+     */
+    public static SessionBytesStoreSupplier inMemorySessionStore(final String name, final Duration retentionPeriod) {
+        Objects.requireNonNull(name, "name cannot be null");
+
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        final long retentionPeriodMs = ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix);
+        if (retentionPeriodMs < 0) {
+            throw new IllegalArgumentException("retentionPeriod cannot be negative");
+        }
+        return new InMemorySessionBytesStoreSupplier(name, retentionPeriodMs);
+    }
+
+    /**
      * Creates a {@link StoreBuilder} that can be used to build a {@link KeyValueStore}.
      * <p>
      * The provided supplier should <strong>not</strong> be a supplier for
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
new file mode 100644
index 0000000..ba5b08f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -0,0 +1,995 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
+@Category({IntegrationTest.class})
+public class StoreUpgradeIntegrationTest {
+    private static String inputStream;
+    private static final String STORE_NAME = "store";
+
+    private KafkaStreams kafkaStreams;
+    private static int testCounter = 0;
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Before
+    public void createTopics() throws Exception {
+        inputStream = "input-stream-" + testCounter;
+        CLUSTER.createTopic(inputStream);
+    }
+
+    private Properties props() {
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "addId-" + testCounter++);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return streamsConfiguration;
+    }
+
+    @After
+    public void shutdown() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+    @Test
+    public void shouldMigrateInMemoryKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception {
+        shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(false);
+    }
+
+    @Test
+    public void shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception {
+        shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(true);
+    }
+
+    private void shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final boolean persistentStore) throws Exception {
+        final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+        streamsBuilderForOldStore.addStateStore(
+            Stores.keyValueStoreBuilder(
+                persistentStore ? Stores.persistentKeyValueStore(STORE_NAME) : Stores.inMemoryKeyValueStore(STORE_NAME),
+                Serdes.Integer(),
+                Serdes.Long()))
+            .<Integer, Integer>stream(inputStream)
+            .process(KeyValueProcessor::new, STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props);
+        kafkaStreams.start();
+
+        processKeyValueAndVerifyPlainCount(1, singletonList(KeyValue.pair(1, 1L)));
+
+        processKeyValueAndVerifyPlainCount(1, singletonList(KeyValue.pair(1, 2L)));
+        final long lastUpdateKeyOne = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L;
+
+        processKeyValueAndVerifyPlainCount(2, asList(
+            KeyValue.pair(1, 2L),
+            KeyValue.pair(2, 1L)));
+        final long lastUpdateKeyTwo = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L;
+
+        processKeyValueAndVerifyPlainCount(3, asList(
+            KeyValue.pair(1, 2L),
+            KeyValue.pair(2, 1L),
+            KeyValue.pair(3, 1L)));
+        final long lastUpdateKeyThree = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L;
+
+        processKeyValueAndVerifyPlainCount(4, asList(
+            KeyValue.pair(1, 2L),
+            KeyValue.pair(2, 1L),
+            KeyValue.pair(3, 1L),
+            KeyValue.pair(4, 1L)));
+
+        processKeyValueAndVerifyPlainCount(4, asList(
+            KeyValue.pair(1, 2L),
+            KeyValue.pair(2, 1L),
+            KeyValue.pair(3, 1L),
+            KeyValue.pair(4, 2L)));
+
+        processKeyValueAndVerifyPlainCount(4, asList(
+            KeyValue.pair(1, 2L),
+            KeyValue.pair(2, 1L),
+            KeyValue.pair(3, 1L),
+            KeyValue.pair(4, 3L)));
+        final long lastUpdateKeyFour = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L;
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+
+
+        final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+        streamsBuilderForNewStore.addStateStore(
+            Stores.timestampedKeyValueStoreBuilder(
+                persistentStore ? Stores.persistentTimestampedKeyValueStore(STORE_NAME) : Stores.inMemoryKeyValueStore(STORE_NAME),
+                Serdes.Integer(),
+                Serdes.Long()))
+            .<Integer, Integer>stream(inputStream)
+            .process(TimestampedKeyValueProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), props);
+        kafkaStreams.start();
+
+        verifyCountWithTimestamp(1, 2L, lastUpdateKeyOne);
+        verifyCountWithTimestamp(2, 1L, lastUpdateKeyTwo);
+        verifyCountWithTimestamp(3, 1L, lastUpdateKeyThree);
+        verifyCountWithTimestamp(4, 3L, lastUpdateKeyFour);
+
+        final long currentTime = CLUSTER.time.milliseconds();
+        processKeyValueAndVerifyCountWithTimestamp(1, currentTime + 42L, asList(
+            KeyValue.pair(1, ValueAndTimestamp.make(3L, currentTime + 42L)),
+            KeyValue.pair(2, ValueAndTimestamp.make(1L, lastUpdateKeyTwo)),
+            KeyValue.pair(3, ValueAndTimestamp.make(1L, lastUpdateKeyThree)),
+            KeyValue.pair(4, ValueAndTimestamp.make(3L, lastUpdateKeyFour))));
+
+        processKeyValueAndVerifyCountWithTimestamp(2, currentTime + 45L, asList(
+            KeyValue.pair(1, ValueAndTimestamp.make(3L, currentTime + 42L)),
+            KeyValue.pair(2, ValueAndTimestamp.make(2L, currentTime + 45L)),
+            KeyValue.pair(3, ValueAndTimestamp.make(1L, lastUpdateKeyThree)),
+            KeyValue.pair(4, ValueAndTimestamp.make(3L, lastUpdateKeyFour))));
+
+        // can process "out of order" record for different key
+        processKeyValueAndVerifyCountWithTimestamp(4, currentTime + 21L, asList(
+            KeyValue.pair(1, ValueAndTimestamp.make(3L, currentTime + 42L)),
+            KeyValue.pair(2, ValueAndTimestamp.make(2L, currentTime + 45L)),
+            KeyValue.pair(3, ValueAndTimestamp.make(1L, lastUpdateKeyThree)),
+            KeyValue.pair(4, ValueAndTimestamp.make(4L, currentTime + 21L))));
+
+        processKeyValueAndVerifyCountWithTimestamp(4, currentTime + 42L, asList(
+            KeyValue.pair(1, ValueAndTimestamp.make(3L, currentTime + 42L)),
+            KeyValue.pair(2, ValueAndTimestamp.make(2L, currentTime + 45L)),
+            KeyValue.pair(3, ValueAndTimestamp.make(1L, lastUpdateKeyThree)),
+            KeyValue.pair(4, ValueAndTimestamp.make(5L, currentTime + 42L))));
+
+        // out of order (same key) record should not reduce result timestamp
+        processKeyValueAndVerifyCountWithTimestamp(4, currentTime + 10L, asList(
+            KeyValue.pair(1, ValueAndTimestamp.make(3L, currentTime + 42L)),
+            KeyValue.pair(2, ValueAndTimestamp.make(2L, currentTime + 45L)),
+            KeyValue.pair(3, ValueAndTimestamp.make(1L, lastUpdateKeyThree)),
+            KeyValue.pair(4, ValueAndTimestamp.make(6L, currentTime + 42L))));
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception {
+        final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+        streamsBuilderForOldStore.addStateStore(
+            Stores.keyValueStoreBuilder(
+                Stores.persistentKeyValueStore(STORE_NAME),
+                Serdes.Integer(),
+                Serdes.Long()))
+            .<Integer, Integer>stream(inputStream)
+            .process(KeyValueProcessor::new, STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props);
+        kafkaStreams.start();
+
+        processKeyValueAndVerifyPlainCount(1, singletonList(KeyValue.pair(1, 1L)));
+
+        processKeyValueAndVerifyPlainCount(1, singletonList(KeyValue.pair(1, 2L)));
+
+        processKeyValueAndVerifyPlainCount(2, asList(
+            KeyValue.pair(1, 2L),
+            KeyValue.pair(2, 1L)));
+
+        processKeyValueAndVerifyPlainCount(3, asList(
+            KeyValue.pair(1, 2L),
+            KeyValue.pair(2, 1L),
+            KeyValue.pair(3, 1L)));
+
+        processKeyValueAndVerifyPlainCount(4, asList(
+            KeyValue.pair(1, 2L),
+            KeyValue.pair(2, 1L),
+            KeyValue.pair(3, 1L),
+            KeyValue.pair(4, 1L)));
+
+        processKeyValueAndVerifyPlainCount(4, asList(
+            KeyValue.pair(1, 2L),
+            KeyValue.pair(2, 1L),
+            KeyValue.pair(3, 1L),
+            KeyValue.pair(4, 2L)));
+
+        processKeyValueAndVerifyPlainCount(4, asList(
+            KeyValue.pair(1, 2L),
+            KeyValue.pair(2, 1L),
+            KeyValue.pair(3, 1L),
+            KeyValue.pair(4, 3L)));
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+
+
+        final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+        streamsBuilderForNewStore.addStateStore(
+            Stores.timestampedKeyValueStoreBuilder(
+                Stores.persistentKeyValueStore(STORE_NAME),
+                Serdes.Integer(),
+                Serdes.Long()))
+            .<Integer, Integer>stream(inputStream)
+            .process(TimestampedKeyValueProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), props);
+        kafkaStreams.start();
+
+        verifyCountWithSurrogateTimestamp(1, 2L);
+        verifyCountWithSurrogateTimestamp(2, 1L);
+        verifyCountWithSurrogateTimestamp(3, 1L);
+        verifyCountWithSurrogateTimestamp(4, 3L);
+
+        processKeyValueAndVerifyCount(1, 42L, asList(
+            KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)),
+            KeyValue.pair(2, ValueAndTimestamp.make(1L, -1L)),
+            KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)),
+            KeyValue.pair(4, ValueAndTimestamp.make(3L, -1L))));
+
+        processKeyValueAndVerifyCount(2, 45L, asList(
+            KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)),
+            KeyValue.pair(2, ValueAndTimestamp.make(2L, -1L)),
+            KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)),
+            KeyValue.pair(4, ValueAndTimestamp.make(3L, -1L))));
+
+        // can process "out of order" record for different key
+        processKeyValueAndVerifyCount(4, 21L, asList(
+            KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)),
+            KeyValue.pair(2, ValueAndTimestamp.make(2L, -1L)),
+            KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)),
+            KeyValue.pair(4, ValueAndTimestamp.make(4L, -1L))));
+
+        processKeyValueAndVerifyCount(4, 42L, asList(
+            KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)),
+            KeyValue.pair(2, ValueAndTimestamp.make(2L, -1L)),
+            KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)),
+            KeyValue.pair(4, ValueAndTimestamp.make(5L, -1L))));
+
+        // out of order (same key) record should not reduce result timestamp
+        processKeyValueAndVerifyCount(4, 10L, asList(
+            KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)),
+            KeyValue.pair(2, ValueAndTimestamp.make(2L, -1L)),
+            KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)),
+            KeyValue.pair(4, ValueAndTimestamp.make(6L, -1L))));
+
+        kafkaStreams.close();
+    }
+
+    private <K, V> void processKeyValueAndVerifyPlainCount(final K key,
+                                                           final List<KeyValue<Integer, Object>> expectedStoreContent)
+            throws Exception {
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            inputStream,
+            singletonList(KeyValue.pair(key, 0)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class),
+            CLUSTER.time);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyKeyValueStore<K, V> store =
+                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.keyValueStore());
+                try (final KeyValueIterator<K, V> all = store.all()) {
+                    final List<KeyValue<K, V>> storeContent = new LinkedList<>();
+                    while (all.hasNext()) {
+                        storeContent.add(all.next());
+                    }
+                    return storeContent.equals(expectedStoreContent);
+                }
+            } catch (final Exception swallow) {
+                swallow.printStackTrace();
+                System.err.println(swallow.getMessage());
+                return false;
+            }
+        }, "Could not get expected result in time.");
+    }
+
+    private <K> void verifyCountWithTimestamp(final K key,
+                                              final long value,
+                                              final long timestamp) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
+                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
+                final ValueAndTimestamp<Long> count = store.get(key);
+                return count.value() == value && count.timestamp() == timestamp;
+            } catch (final Exception swallow) {
+                swallow.printStackTrace();
+                System.err.println(swallow.getMessage());
+                return false;
+            }
+        }, "Could not get expected result in time.");
+    }
+
+    private <K> void verifyCountWithSurrogateTimestamp(final K key,
+                                                       final long value) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
+                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
+                final ValueAndTimestamp<Long> count = store.get(key);
+                return count.value() == value && count.timestamp() == -1L;
+            } catch (final Exception swallow) {
+                swallow.printStackTrace();
+                System.err.println(swallow.getMessage());
+                return false;
+            }
+        }, "Could not get expected result in time.");
+    }
+
+    private <K, V> void processKeyValueAndVerifyCount(final K key,
+                                                      final long timestamp,
+                                                      final List<KeyValue<Integer, Object>> expectedStoreContent)
+            throws Exception {
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, 0)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class),
+            timestamp);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
+                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
+                try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
+                    final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
+                    while (all.hasNext()) {
+                        storeContent.add(all.next());
+                    }
+                    return storeContent.equals(expectedStoreContent);
+                }
+            } catch (final Exception swallow) {
+                swallow.printStackTrace();
+                System.err.println(swallow.getMessage());
+                return false;
+            }
+        }, "Could not get expected result in time.");
+    }
+
+    private <K, V> void processKeyValueAndVerifyCountWithTimestamp(final K key,
+                                                                   final long timestamp,
+                                                                   final List<KeyValue<Integer, Object>> expectedStoreContent)
+        throws Exception {
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, 0)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class),
+            timestamp);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
+                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
+                try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
+                    final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
+                    while (all.hasNext()) {
+                        storeContent.add(all.next());
+                    }
+                    return storeContent.equals(expectedStoreContent);
+                }
+            } catch (final Exception swallow) {
+                swallow.printStackTrace();
+                System.err.println(swallow.getMessage());
+                return false;
+            }
+        }, "Could not get expected result in time.");
+    }
+
+    @Test
+    public void shouldMigrateInMemoryWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception {
+        final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+        streamsBuilderForOldStore
+            .addStateStore(
+                Stores.windowStoreBuilder(
+                    Stores.inMemoryWindowStore(
+                        STORE_NAME,
+                        Duration.ofMillis(1000L),
+                        Duration.ofMillis(1000L),
+                        false),
+                Serdes.Integer(),
+                Serdes.Long()))
+            .<Integer, Integer>stream(inputStream)
+            .process(WindowedProcessor::new, STORE_NAME);
+
+        final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+        streamsBuilderForNewStore
+            .addStateStore(
+                Stores.timestampedWindowStoreBuilder(
+                    Stores.inMemoryWindowStore(
+                        STORE_NAME,
+                        Duration.ofMillis(1000L),
+                        Duration.ofMillis(1000L),
+                        false),
+            Serdes.Integer(),
+            Serdes.Long()))
+            .<Integer, Integer>stream(inputStream)
+            .process(TimestampedWindowedProcessor::new, STORE_NAME);
+
+
+        shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(
+            new KafkaStreams(streamsBuilderForOldStore.build(), props()),
+            new KafkaStreams(streamsBuilderForNewStore.build(), props()),
+            false);
+    }
+
+    @Test
+    public void shouldMigratePersistentWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception {
+        final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+        streamsBuilderForOldStore
+            .addStateStore(
+                Stores.windowStoreBuilder(
+                    Stores.persistentWindowStore(
+                        STORE_NAME,
+                        Duration.ofMillis(1000L),
+                        Duration.ofMillis(1000L),
+                        false),
+                    Serdes.Integer(),
+                    Serdes.Long()))
+            .<Integer, Integer>stream(inputStream)
+            .process(WindowedProcessor::new, STORE_NAME);
+
+        final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+        streamsBuilderForNewStore
+            .addStateStore(
+                Stores.timestampedWindowStoreBuilder(
+                    Stores.persistentTimestampedWindowStore(
+                        STORE_NAME,
+                        Duration.ofMillis(1000L),
+                        Duration.ofMillis(1000L),
+                        false),
+                    Serdes.Integer(),
+                    Serdes.Long()))
+            .<Integer, Integer>stream(inputStream)
+            .process(TimestampedWindowedProcessor::new, STORE_NAME);
+
+        final Properties props = props();
+        shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(
+            new KafkaStreams(streamsBuilderForOldStore.build(), props),
+            new KafkaStreams(streamsBuilderForNewStore.build(), props),
+            true);
+    }
+
+    private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(final KafkaStreams kafkaStreamsOld,
+                                                                           final KafkaStreams kafkaStreamsNew,
+                                                                           final boolean persistentStore) throws Exception {
+        kafkaStreams = kafkaStreamsOld;
+        kafkaStreams.start();
+
+        processWindowedKeyValueAndVerifyPlainCount(1, singletonList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 1L)));
+
+        processWindowedKeyValueAndVerifyPlainCount(1, singletonList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L)));
+        final long lastUpdateKeyOne = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L;
+
+        processWindowedKeyValueAndVerifyPlainCount(2, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L)));
+        final long lastUpdateKeyTwo = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L;
+
+        processWindowedKeyValueAndVerifyPlainCount(3, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L)));
+        final long lastUpdateKeyThree = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L;
+
+        processWindowedKeyValueAndVerifyPlainCount(4, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), 1L)));
+
+        processWindowedKeyValueAndVerifyPlainCount(4, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), 2L)));
+
+        processWindowedKeyValueAndVerifyPlainCount(4, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), 3L)));
+        final long lastUpdateKeyFour = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L;
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+
+        kafkaStreams = kafkaStreamsNew;
+        kafkaStreams.start();
+
+        verifyWindowedCountWithTimestamp(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L, lastUpdateKeyOne);
+        verifyWindowedCountWithTimestamp(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L, lastUpdateKeyTwo);
+        verifyWindowedCountWithTimestamp(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L, lastUpdateKeyThree);
+        verifyWindowedCountWithTimestamp(new Windowed<>(4, new TimeWindow(0L, 1000L)), 3L, lastUpdateKeyFour);
+
+        final long currentTime = CLUSTER.time.milliseconds();
+        processKeyValueAndVerifyWindowedCountWithTimestamp(1, currentTime + 42L, asList(
+            KeyValue.pair(
+                new Windowed<>(1, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(3L, currentTime + 42L)),
+            KeyValue.pair(
+                new Windowed<>(2, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(1L, lastUpdateKeyTwo)),
+            KeyValue.pair(
+                new Windowed<>(3, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(1L, lastUpdateKeyThree)),
+            KeyValue.pair(
+                new Windowed<>(4, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(3L, lastUpdateKeyFour))));
+
+        processKeyValueAndVerifyWindowedCountWithTimestamp(2, currentTime + 45L, asList(
+            KeyValue.pair(
+                new Windowed<>(1, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(3L, currentTime + 42L)),
+            KeyValue.pair(
+                new Windowed<>(2, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(2L, currentTime + 45L)),
+            KeyValue.pair(
+                new Windowed<>(3, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(1L, lastUpdateKeyThree)),
+            KeyValue.pair(
+                new Windowed<>(4, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(3L, lastUpdateKeyFour))));
+
+        // can process "out of order" record for different key
+        processKeyValueAndVerifyWindowedCountWithTimestamp(4, currentTime + 21L, asList(
+            KeyValue.pair(
+                new Windowed<>(1, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(3L, currentTime + 42L)),
+            KeyValue.pair(
+                new Windowed<>(2, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(2L, currentTime + 45L)),
+            KeyValue.pair(
+                new Windowed<>(3, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(1L, lastUpdateKeyThree)),
+            KeyValue.pair(
+                new Windowed<>(4, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(4L, currentTime + 21L))));
+
+        processKeyValueAndVerifyWindowedCountWithTimestamp(4, currentTime + 42L, asList(
+            KeyValue.pair(
+                new Windowed<>(1, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(3L, currentTime + 42L)),
+            KeyValue.pair(
+                new Windowed<>(2, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(2L, currentTime + 45L)),
+            KeyValue.pair(
+                new Windowed<>(3, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(1L, lastUpdateKeyThree)),
+            KeyValue.pair(
+                new Windowed<>(4, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(5L, currentTime + 42L))));
+
+        // out of order (same key) record should not reduce result timestamp
+        processKeyValueAndVerifyWindowedCountWithTimestamp(4, currentTime + 10L, asList(
+            KeyValue.pair(
+                new Windowed<>(1, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(3L, currentTime + 42L)),
+            KeyValue.pair(
+                new Windowed<>(2, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(2L, currentTime + 45L)),
+            KeyValue.pair(
+                new Windowed<>(3, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(1L, lastUpdateKeyThree)),
+            KeyValue.pair(
+                new Windowed<>(4, new TimeWindow(0L, 1000L)),
+                ValueAndTimestamp.make(6L, currentTime + 42L))));
+
+        // test new segment
+        processKeyValueAndVerifyWindowedCountWithTimestamp(10, currentTime + 100001L, singletonList(
+            KeyValue.pair(
+                new Windowed<>(10, new TimeWindow(100000L, 101000L)), ValueAndTimestamp.make(1L, currentTime + 100001L))));
+
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    public void shouldProxyWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception {
+        final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+        streamsBuilderForOldStore.addStateStore(
+            Stores.windowStoreBuilder(
+                Stores.persistentWindowStore(
+                    STORE_NAME,
+                    Duration.ofMillis(1000L),
+                    Duration.ofMillis(1000L),
+                    false),
+                Serdes.Integer(),
+                Serdes.Long()))
+            .<Integer, Integer>stream(inputStream)
+            .process(WindowedProcessor::new, STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props);
+        kafkaStreams.start();
+
+        processWindowedKeyValueAndVerifyPlainCount(1, singletonList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 1L)));
+
+        processWindowedKeyValueAndVerifyPlainCount(1, singletonList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L)));
+
+        processWindowedKeyValueAndVerifyPlainCount(2, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L)));
+
+        processWindowedKeyValueAndVerifyPlainCount(3, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L)));
+
+        processWindowedKeyValueAndVerifyPlainCount(4, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), 1L)));
+
+        processWindowedKeyValueAndVerifyPlainCount(4, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), 2L)));
+
+        processWindowedKeyValueAndVerifyPlainCount(4, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L),
+            KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), 3L)));
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+
+
+        final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+        streamsBuilderForNewStore.addStateStore(
+            Stores.timestampedWindowStoreBuilder(
+                Stores.persistentWindowStore(
+                    STORE_NAME,
+                    Duration.ofMillis(1000L),
+                    Duration.ofMillis(1000L),
+                    false),
+                Serdes.Integer(),
+                Serdes.Long()))
+            .<Integer, Integer>stream(inputStream)
+            .process(TimestampedWindowedProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), props);
+        kafkaStreams.start();
+
+        verifyWindowedCountWithSurrogateTimestamp(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L);
+        verifyWindowedCountWithSurrogateTimestamp(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L);
+        verifyWindowedCountWithSurrogateTimestamp(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L);
+        verifyWindowedCountWithSurrogateTimestamp(new Windowed<>(4, new TimeWindow(0L, 1000L)), 3L);
+
+        processKeyValueAndVerifyWindowedCountWithTimestamp(1, 42L, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)),
+            KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)),
+            KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L))));
+
+        processKeyValueAndVerifyWindowedCountWithTimestamp(2, 45L, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, -1L)),
+            KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)),
+            KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L))));
+
+        // can process "out of order" record for different key
+        processKeyValueAndVerifyWindowedCountWithTimestamp(4, 21L, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, -1L)),
+            KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)),
+            KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(4L, -1L))));
+
+        processKeyValueAndVerifyWindowedCountWithTimestamp(4, 42L, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, -1L)),
+            KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)),
+            KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(5L, -1L))));
+
+        // out of order (same key) record should not reduce result timestamp
+        processKeyValueAndVerifyWindowedCountWithTimestamp(4, 10L, asList(
+            KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)),
+            KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, -1L)),
+            KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)),
+            KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(6L, -1L))));
+
+        // test new segment
+        processKeyValueAndVerifyWindowedCountWithTimestamp(10, 100001L, singletonList(
+            KeyValue.pair(new Windowed<>(10, new TimeWindow(100000L, 101000L)), ValueAndTimestamp.make(1L, -1L))));
+
+
+        kafkaStreams.close();
+    }
+
+    private <K, V> void processWindowedKeyValueAndVerifyPlainCount(final K key,
+                                                                   final List<KeyValue<Windowed<Integer>, Object>> expectedStoreContent)
+            throws Exception {
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            inputStream,
+            singletonList(KeyValue.pair(key, 0)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class),
+            CLUSTER.time);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<K, V> store =
+                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.windowStore());
+                try (final KeyValueIterator<Windowed<K>, V> all = store.all()) {
+                    final List<KeyValue<Windowed<K>, V>> storeContent = new LinkedList<>();
+                    while (all.hasNext()) {
+                        storeContent.add(all.next());
+                    }
+                    return storeContent.equals(expectedStoreContent);
+                }
+            } catch (final Exception swallow) {
+                swallow.printStackTrace();
+                System.err.println(swallow.getMessage());
+                return false;
+            }
+        }, "Could not get expected result in time.");
+    }
+
+    private <K> void verifyWindowedCountWithSurrogateTimestamp(final Windowed<K> key,
+                                                               final long value) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
+                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
+                final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
+                return count.value() == value && count.timestamp() == -1L;
+            } catch (final Exception swallow) {
+                swallow.printStackTrace();
+                System.err.println(swallow.getMessage());
+                return false;
+            }
+        }, "Could not get expected result in time.");
+    }
+
+    private <K> void verifyWindowedCountWithTimestamp(final Windowed<K> key,
+                                                      final long value,
+                                                      final long timestamp) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
+                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
+                final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
+                return count.value() == value && count.timestamp() == timestamp;
+            } catch (final Exception swallow) {
+                swallow.printStackTrace();
+                System.err.println(swallow.getMessage());
+                return false;
+            }
+        }, "Could not get expected result in time.");
+    }
+
+    private <K, V> void processKeyValueAndVerifyWindowedCountWithTimestamp(final K key,
+                                                                           final long timestamp,
+                                                                           final List<KeyValue<Windowed<Integer>, Object>> expectedStoreContent)
+            throws Exception {
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, 0)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class),
+            timestamp);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<K, ValueAndTimestamp<V>> store =
+                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
+                try (final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> all = store.all()) {
+                    final List<KeyValue<Windowed<K>, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
+                    while (all.hasNext()) {
+                        storeContent.add(all.next());
+                    }
+                    return storeContent.equals(expectedStoreContent);
+                }
+            } catch (final Exception swallow) {
+                swallow.printStackTrace();
+                System.err.println(swallow.getMessage());
+                return false;
+            }
+        }, "Could not get expected result in time.");
+    }
+
+    private static class KeyValueProcessor implements Processor<Integer, Integer> {
+        private KeyValueStore<Integer, Long> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            store = (KeyValueStore<Integer, Long>) context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Integer key, final Integer value) {
+            final long newCount;
+
+            final Long oldCount = store.get(key);
+            if (oldCount != null) {
+                newCount = oldCount + 1L;
+            } else {
+                newCount = 1L;
+            }
+
+            store.put(key, newCount);
+        }
+
+        @Override
+        public void close() {}
+    }
+
+    private static class TimestampedKeyValueProcessor implements Processor<Integer, Integer> {
+        private ProcessorContext context;
+        private TimestampedKeyValueStore<Integer, Long> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            this.context = context;
+            store = (TimestampedKeyValueStore<Integer, Long>) context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Integer key, final Integer value) {
+            final long newCount;
+
+            final ValueAndTimestamp<Long> oldCountWithTimestamp = store.get(key);
+            final long newTimestamp;
+
+            if (oldCountWithTimestamp == null) {
+                newCount = 1L;
+                newTimestamp = context.timestamp();
+            } else {
+                newCount = oldCountWithTimestamp.value() + 1L;
+                newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), context.timestamp());
+            }
+
+            store.put(key, ValueAndTimestamp.make(newCount, newTimestamp));
+        }
+
+        @Override
+        public void close() {}
+    }
+
+    private static class WindowedProcessor implements Processor<Integer, Integer> {
+        private WindowStore<Integer, Long> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            store = (WindowStore<Integer, Long>) context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Integer key, final Integer value) {
+            final long newCount;
+
+            final Long oldCount = store.fetch(key, key < 10 ? 0L : 100000L);
+            if (oldCount != null) {
+                newCount = oldCount + 1L;
+            } else {
+                newCount = 1L;
+            }
+
+            store.put(key, newCount, key < 10 ? 0L : 100000L);
+        }
+
+        @Override
+        public void close() {}
+    }
+
+    private static class TimestampedWindowedProcessor implements Processor<Integer, Integer> {
+        private ProcessorContext context;
+        private TimestampedWindowStore<Integer, Long> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            this.context = context;
+            store = (TimestampedWindowStore<Integer, Long>) context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Integer key, final Integer value) {
+            final long newCount;
+
+            final ValueAndTimestamp<Long> oldCountWithTimestamp = store.fetch(key, key < 10 ? 0L : 100000L);
+            final long newTimestamp;
+
+            if (oldCountWithTimestamp == null) {
+                newCount = 1L;
+                newTimestamp = context.timestamp();
+            } else {
+                newCount = oldCountWithTimestamp.value() + 1L;
+                newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), context.timestamp());
+            }
+
+            store.put(key, ValueAndTimestamp.make(newCount, newTimestamp), key < 10 ? 0L : 100000L);
+        }
+
+        @Override
+        public void close() {}
+    }
+}
\ No newline at end of file
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index d577cf7..2394203 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -78,6 +77,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -465,23 +465,17 @@ public class TopologyTestDriverTest {
         topology.addSink(
             "sink",
             SINK_TOPIC_1,
-            new Serializer<Object>() {
-                @Override
-                public byte[] serialize(final String topic, final Object data) {
-                    if (data instanceof Long) {
-                        return Serdes.Long().serializer().serialize(topic, (Long) data);
-                    }
-                    return Serdes.Integer().serializer().serialize(topic, (Integer) data);
+            (topic, data) -> {
+                if (data instanceof Long) {
+                    return Serdes.Long().serializer().serialize(topic, (Long) data);
                 }
+                return Serdes.Integer().serializer().serialize(topic, (Integer) data);
             },
-            new Serializer<Object>() {
-                @Override
-                public byte[] serialize(final String topic, final Object data) {
-                    if (data instanceof String) {
-                        return Serdes.String().serializer().serialize(topic, (String) data);
-                    }
-                    return Serdes.Double().serializer().serialize(topic, (Double) data);
+            (topic, data) -> {
+                if (data instanceof String) {
+                    return Serdes.String().serializer().serialize(topic, (String) data);
                 }
+                return Serdes.Double().serializer().serialize(topic, (Double) data);
             },
             processor);
 
@@ -736,6 +730,160 @@ public class TopologyTestDriverTest {
     }
 
     @Test
+    public void shouldReturnCorrectPersistentStoreTypeOnly() {
+        shouldReturnCorrectStoreTypeOnly(true);
+    }
+
+    @Test
+    public void shouldReturnCorrectInMemoryStoreTypeOnly() {
+        shouldReturnCorrectStoreTypeOnly(false);
+    }
+
+    private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) {
+        final String keyValueStoreName = "keyValueStore";
+        final String timestampedKeyValueStoreName = "keyValueTimestampStore";
+        final String windowStoreName = "windowStore";
+        final String timestampedWindowStoreName = "windowTimestampStore";
+        final String sessionStoreName = "sessionStore";
+        final String globalKeyValueStoreName = "globalKeyValueStore";
+        final String globalTimestampedKeyValueStoreName = "globalKeyValueTimestampStore";
+
+        final Topology topology = setupSingleProcessorTopology();
+
+        // add state stores
+        topology.addStateStore(
+            Stores.keyValueStoreBuilder(
+                persistent ?
+                    Stores.persistentKeyValueStore(keyValueStoreName) :
+                    Stores.inMemoryKeyValueStore(keyValueStoreName),
+                Serdes.ByteArray(),
+                Serdes.ByteArray()
+            ),
+            "processor");
+        topology.addStateStore(
+            Stores.timestampedKeyValueStoreBuilder(
+                persistent ?
+                    Stores.persistentTimestampedKeyValueStore(timestampedKeyValueStoreName) :
+                    Stores.inMemoryKeyValueStore(timestampedKeyValueStoreName),
+                Serdes.ByteArray(),
+                Serdes.ByteArray()
+            ),
+            "processor");
+        topology.addStateStore(
+            Stores.windowStoreBuilder(
+                persistent ?
+                    Stores.persistentWindowStore(windowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false) :
+                    Stores.inMemoryWindowStore(windowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false),
+                Serdes.ByteArray(),
+                Serdes.ByteArray()
+            ),
+            "processor");
+        topology.addStateStore(
+            Stores.timestampedWindowStoreBuilder(
+                persistent ?
+                    Stores.persistentTimestampedWindowStore(timestampedWindowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false) :
+                    Stores.inMemoryWindowStore(timestampedWindowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false),
+                Serdes.ByteArray(),
+                Serdes.ByteArray()
+            ),
+            "processor");
+        topology.addStateStore(
+            persistent ?
+                Stores.sessionStoreBuilder(
+                    Stores.persistentSessionStore(sessionStoreName, Duration.ofMillis(1000L)),
+                    Serdes.ByteArray(),
+                    Serdes.ByteArray()) :
+                Stores.sessionStoreBuilder(
+                    Stores.inMemorySessionStore(sessionStoreName, Duration.ofMillis(1000L)),
+                    Serdes.ByteArray(),
+                    Serdes.ByteArray()),
+            "processor");
+        // add global stores
+        topology.addGlobalStore(
+            persistent ?
+                Stores.keyValueStoreBuilder(
+                    Stores.persistentKeyValueStore(globalKeyValueStoreName),
+                    Serdes.ByteArray(),
+                    Serdes.ByteArray()
+                ).withLoggingDisabled() :
+                Stores.keyValueStoreBuilder(
+                    Stores.inMemoryKeyValueStore(globalKeyValueStoreName),
+                    Serdes.ByteArray(),
+                    Serdes.ByteArray()
+                ).withLoggingDisabled(),
+            "sourceDummy1",
+            Serdes.ByteArray().deserializer(),
+            Serdes.ByteArray().deserializer(),
+            "topicDummy1",
+            "processorDummy1",
+            () -> null);
+        topology.addGlobalStore(
+            persistent ?
+                Stores.timestampedKeyValueStoreBuilder(
+                    Stores.persistentTimestampedKeyValueStore(globalTimestampedKeyValueStoreName),
+                    Serdes.ByteArray(),
+                    Serdes.ByteArray()
+                ).withLoggingDisabled() :
+                Stores.timestampedKeyValueStoreBuilder(
+                    Stores.inMemoryKeyValueStore(globalTimestampedKeyValueStoreName),
+                    Serdes.ByteArray(),
+                    Serdes.ByteArray()
+                ).withLoggingDisabled(),
+            "sourceDummy2",
+            Serdes.ByteArray().deserializer(),
+            Serdes.ByteArray().deserializer(),
+            "topicDummy2",
+            "processorDummy2",
+            () -> null);
+
+        testDriver = new TopologyTestDriver(topology, config);
+
+        // verify state stores
+        assertNotNull(testDriver.getKeyValueStore(keyValueStoreName));
+        assertNull(testDriver.getTimestampedKeyValueStore(keyValueStoreName));
+        assertNull(testDriver.getWindowStore(keyValueStoreName));
+        assertNull(testDriver.getTimestampedWindowStore(keyValueStoreName));
+        assertNull(testDriver.getSessionStore(keyValueStoreName));
+
+        assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreName));
+        assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreName));
+        assertNull(testDriver.getWindowStore(timestampedKeyValueStoreName));
+        assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreName));
+        assertNull(testDriver.getSessionStore(timestampedKeyValueStoreName));
+
+        assertNull(testDriver.getKeyValueStore(windowStoreName));
+        assertNull(testDriver.getTimestampedKeyValueStore(windowStoreName));
+        assertNotNull(testDriver.getWindowStore(windowStoreName));
+        assertNull(testDriver.getTimestampedWindowStore(windowStoreName));
+        assertNull(testDriver.getSessionStore(windowStoreName));
+
+        assertNull(testDriver.getKeyValueStore(timestampedWindowStoreName));
+        assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreName));
+        assertNotNull(testDriver.getWindowStore(timestampedWindowStoreName));
+        assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreName));
+        assertNull(testDriver.getSessionStore(timestampedWindowStoreName));
+
+        assertNull(testDriver.getKeyValueStore(sessionStoreName));
+        assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreName));
+        assertNull(testDriver.getWindowStore(sessionStoreName));
+        assertNull(testDriver.getTimestampedWindowStore(sessionStoreName));
+        assertNotNull(testDriver.getSessionStore(sessionStoreName));
+
+        // verify global stores
+        assertNotNull(testDriver.getKeyValueStore(globalKeyValueStoreName));
+        assertNull(testDriver.getTimestampedKeyValueStore(globalKeyValueStoreName));
+        assertNull(testDriver.getWindowStore(globalKeyValueStoreName));
+        assertNull(testDriver.getTimestampedWindowStore(globalKeyValueStoreName));
+        assertNull(testDriver.getSessionStore(globalKeyValueStoreName));
+
+        assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreName));
+        assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreName));
+        assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreName));
+        assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreName));
+        assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreName));
+    }
+
+    @Test
     public void shouldReturnAllStoresNames() {
         final Topology topology = setupSourceSinkTopology();
         topology.addStateStore(
@@ -793,7 +941,7 @@ public class TopologyTestDriverTest {
         setup();
         testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
         OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
-        Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+        assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
     }
 
     @Test
@@ -802,7 +950,7 @@ public class TopologyTestDriverTest {
         testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
         assertThat(store.get("a"), equalTo(21L));
         OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
-        Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+        assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
     }
 
     @Test
@@ -811,7 +959,7 @@ public class TopologyTestDriverTest {
         testDriver.pipeInput(recordFactory.create("input-topic", "a", 42L, 9999L));
         assertThat(store.get("a"), equalTo(42L));
         OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 42L);
-        Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+        assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
     }
 
     @Test
@@ -821,7 +969,7 @@ public class TopologyTestDriverTest {
         assertThat(store.get("b"), equalTo(21L));
         OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
         OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "b", 21L);
-        Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+        assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
     }
 
     @Test
@@ -831,11 +979,11 @@ public class TopologyTestDriverTest {
         OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
 
         testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
-        Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+        assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
 
         testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 10000L));
         OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
-        Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+        assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
     }
 
     @Test
@@ -843,7 +991,7 @@ public class TopologyTestDriverTest {
         setup();
         testDriver.advanceWallClockTime(60000);
         OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
-        Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+        assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
     }
 
     private class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
@@ -944,14 +1092,14 @@ public class TopologyTestDriverTest {
         config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
 
         try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
-            Assert.assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"));
+            assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"));
             testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L));
             Assert.assertEquals(1L, testDriver.getKeyValueStore("storeProcessorStore").get("a"));
         }
 
 
         try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
-            Assert.assertNull(
+            assertNull(
                 "Closing the prior test driver should have cleaned up this store and value.",
                 testDriver.getKeyValueStore("storeProcessorStore").get("a")
             );