You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/03/06 22:57:23 UTC

[kafka] branch trunk updated: KAFKA-3522: Add public interfaces for timestamped stores (#6175)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ab00c51  KAFKA-3522: Add public interfaces for timestamped stores (#6175)
ab00c51 is described below

commit ab00c51b6362b41071ae32611ebf698ba9c221ee
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Mar 6 14:57:08 2019 -0800

    KAFKA-3522: Add public interfaces for timestamped stores (#6175)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../kafka/streams/state/QueryableStoreType.java    |   6 +-
 .../kafka/streams/state/QueryableStoreTypes.java   |  63 ++++++-
 .../apache/kafka/streams/state/SessionStore.java   |  10 +-
 .../streams/state/TimestampedWindowStore.java      |  34 ++++
 .../kafka/streams/state/ValueAndTimestamp.java     |  11 +-
 .../apache/kafka/streams/state/WindowStore.java    |   4 +-
 .../state/internals/KeyValueIteratorFacade.java    |  50 +++++
 .../internals/ReadOnlyKeyValueStoreFacade.java     |  52 ++++++
 .../state/internals/ReadOnlyWindowStoreFacade.java | 123 ++++++++++++
 .../internals/KeyValueIteratorFacadeTest.java      |  87 +++++++++
 .../internals/ReadOnlyKeyValueStoreFacadeTest.java |  99 ++++++++++
 .../internals/ReadOnlyWindowStoreFacadeTest.java   | 207 +++++++++++++++++++++
 .../apache/kafka/streams/TopologyTestDriver.java   | 102 +++++++++-
 .../streams/internals/KeyValueStoreFacade.java     |  92 +++++++++
 .../kafka/streams/internals/WindowStoreFacade.java |  76 ++++++++
 .../streams/internals/KeyValueStoreFacadeTest.java | 168 +++++++++++++++++
 .../streams/internals/WindowStoreFacadeTest.java   | 135 ++++++++++++++
 17 files changed, 1300 insertions(+), 19 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
index 3d4f55c..79c335a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
@@ -43,7 +43,9 @@ public interface QueryableStoreType<T> {
      *
      * @param storeProvider     provides access to all the underlying StateStore instances
      * @param storeName         The name of the Store
-     * @return a read-only interface over a {@code StateStore} (cf. {@link org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType})
+     * @return a read-only interface over a {@code StateStore}
+     *        (cf. {@link org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType})
      */
-    T create(final StateStoreProvider storeProvider, final String storeName);
+    T create(final StateStoreProvider storeProvider,
+             final String storeName);
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index 297e181..7b1e8b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -43,6 +43,17 @@ public final class QueryableStoreTypes {
     }
 
     /**
+     * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>}.
+     *
+     * @param <K> key type of the store
+     * @param <V> value type of the store
+     * @return {@link QueryableStoreTypes.TimestampedKeyValueStoreType}
+     */
+    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> timestampedKeyValueStore() {
+        return new TimestampedKeyValueStoreType<>();
+    }
+
+    /**
      * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}.
      *
      * @param <K> key type of the store
@@ -54,6 +65,17 @@ public final class QueryableStoreTypes {
     }
 
     /**
+     * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore ReadOnlyWindowStore<K, ValueAndTimestamp<V>>}.
+     *
+     * @param <K> key type of the store
+     * @param <V> value type of the store
+     * @return {@link QueryableStoreTypes.TimestampedWindowStoreType}
+     */
+    public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>> timestampedWindowStore() {
+        return new TimestampedWindowStoreType<>();
+    }
+
+    /**
      * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}.
      *
      * @param <K> key type of the store
@@ -79,7 +101,8 @@ public final class QueryableStoreTypes {
         }
     }
 
-    static class KeyValueStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
+    public static class KeyValueStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
+
         KeyValueStoreType() {
             super(ReadOnlyKeyValueStore.class);
         }
@@ -92,7 +115,22 @@ public final class QueryableStoreTypes {
 
     }
 
-    static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, V>> {
+    private static class TimestampedKeyValueStoreType<K, V>
+        extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> {
+
+        TimestampedKeyValueStoreType() {
+            super(ReadOnlyKeyValueStore.class);
+        }
+
+        @Override
+        public ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> create(final StateStoreProvider storeProvider,
+                                                                     final String storeName) {
+            return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, storeName);
+        }
+    }
+
+    public static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, V>> {
+
         WindowStoreType() {
             super(ReadOnlyWindowStore.class);
         }
@@ -104,14 +142,31 @@ public final class QueryableStoreTypes {
         }
     }
 
-    static class SessionStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySessionStore<K, V>> {
+    private static class TimestampedWindowStoreType<K, V>
+        extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>> {
+
+        TimestampedWindowStoreType() {
+            super(ReadOnlyWindowStore.class);
+        }
+
+        @Override
+        public ReadOnlyWindowStore<K, ValueAndTimestamp<V>> create(final StateStoreProvider storeProvider,
+                                                                   final String storeName) {
+            return new CompositeReadOnlyWindowStore<>(storeProvider, this, storeName);
+        }
+    }
+
+    public static class SessionStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySessionStore<K, V>> {
+
         SessionStoreType() {
             super(ReadOnlySessionStore.class);
         }
+
         @Override
         public ReadOnlySessionStore<K, V> create(final StateStoreProvider storeProvider,
                                                  final String storeName) {
             return new CompositeReadOnlySessionStore<>(storeProvider, this, storeName);
         }
     }
-}
+
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
index c8b1805..4f897e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
@@ -16,11 +16,19 @@
  */
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.StateStore;
 
 /**
- * Interface for storing the aggregated values of sessions
+ * Interface for storing the aggregated values of sessions.
+ * <p>
+ * The key is internally represented as {@link Windowed Windowed&lt;K&gt;} that comprises the plain key
+ * and the {@link Window} that represents window start- and end-timestamp.
+ * <p>
+ * If two sessions are merged, a new session with new start- and end-timestamp must be inserted into the store
+ * while the two old sessions must be deleted.
+ *
  * @param <K>   type of the record keys
  * @param <AGG> type of the aggregated values
  */
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/TimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedWindowStore.java
new file mode 100644
index 0000000..7d52c12
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedWindowStore.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.kstream.Windowed;
+
+/**
+ * Interface for storing the aggregated values of fixed-size time windows.
+ * <p>
+ * Note, that the stores's physical key type is {@link Windowed Windowed&lt;K&gt;}.
+ * In contrast to a {@link WindowStore} that stores plain windowedKeys-value pairs,
+ * a {@code TimestampedWindowStore} stores windowedKeys-(value/timestamp) pairs.
+ * <p>
+ * While the window start- and end-timestamp are fixed per window, the value-side timestamp is used
+ * to store the last update timestamp of the corresponding window.
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ */
+public interface TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> { }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
index a0acc77..8bb652c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
@@ -36,6 +36,15 @@ public class ValueAndTimestamp<V> {
         this.timestamp = timestamp;
     }
 
+    /**
+     * Create a new {@link ValueAndTimestamp} instance if the provide {@code value} is not {@code null}.
+     *
+     * @param value      the value
+     * @param timestamp  the timestamp
+     * @param <V> the type of the value
+     * @return a new {@link ValueAndTimestamp} instance if the provide {@code value} is not {@code null};
+     *         otherwise {@code null} is returned
+     */
     public static <V> ValueAndTimestamp<V> make(final V value,
                                                 final long timestamp) {
         return value == null ? null : new ValueAndTimestamp<>(value, timestamp);
@@ -71,4 +80,4 @@ public class ValueAndTimestamp<V> {
     public int hashCode() {
         return Objects.hash(value, timestamp);
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 2203f59..f7eb37e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -26,7 +26,9 @@ import java.time.Instant;
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 
 /**
- * A windowed store interface extending {@link StateStore}.
+ * Interface for storing the aggregated values of fixed-size time windows.
+ * <p>
+ * Note, that the stores's physical key type is {@link Windowed Windowed&lt;K&gt;}.
  *
  * @param <K> Type of keys
  * @param <V> Type of values
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
new file mode 100644
index 0000000..099c3df
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+public class KeyValueIteratorFacade<K, V> implements KeyValueIterator<K, V> {
+    private final KeyValueIterator<K, ValueAndTimestamp<V>> innerIterator;
+
+    public KeyValueIteratorFacade(final KeyValueIterator<K, ValueAndTimestamp<V>> iterator) {
+        innerIterator = iterator;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return innerIterator.hasNext();
+    }
+
+    @Override
+    public K peekNextKey() {
+        return innerIterator.peekNextKey();
+    }
+
+    @Override
+    public KeyValue<K, V> next() {
+        final KeyValue<K, ValueAndTimestamp<V>> innerKeyValue = innerIterator.next();
+        return KeyValue.pair(innerKeyValue.key, innerKeyValue.value.value());
+    }
+
+    @Override
+    public void close() {
+        innerIterator.close();
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
new file mode 100644
index 0000000..fe24043
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+public class ReadOnlyKeyValueStoreFacade<K, V> implements ReadOnlyKeyValueStore<K, V> {
+    protected final TimestampedKeyValueStore<K, V> inner;
+
+    protected ReadOnlyKeyValueStoreFacade(final TimestampedKeyValueStore<K, V> store) {
+        inner = store;
+    }
+
+    @Override
+    public V get(final K key) {
+        final ValueAndTimestamp<V> valueAndTimestamp = inner.get(key);
+        return valueAndTimestamp == null ? null : valueAndTimestamp.value();
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(final K from,
+                                        final K to) {
+        return new KeyValueIteratorFacade<>(inner.range(from, to));
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        return new KeyValueIteratorFacade<>(inner.all());
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return inner.approximateNumEntries();
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java
new file mode 100644
index 0000000..46db1f2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+
+public class ReadOnlyWindowStoreFacade<K, V> implements ReadOnlyWindowStore<K, V> {
+    protected final TimestampedWindowStore<K, V> inner;
+
+    protected ReadOnlyWindowStoreFacade(final TimestampedWindowStore<K, V> store) {
+        inner = store;
+    }
+
+    @Override
+    public V fetch(final K key,
+                   final long time) {
+        final ValueAndTimestamp<V> valueAndTimestamp = inner.fetch(key, time);
+        return valueAndTimestamp == null ? null : valueAndTimestamp.value();
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public WindowStoreIterator<V> fetch(final K key,
+                                        final long timeFrom,
+                                        final long timeTo) {
+        return new WindowStoreIteratorFacade<>(inner.fetch(key, timeFrom, timeTo));
+    }
+
+    @Override
+    public WindowStoreIterator<V> fetch(final K key,
+                                        final Instant from,
+                                        final Instant to) throws IllegalArgumentException {
+        return new WindowStoreIteratorFacade<>(inner.fetch(key, from, to));
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+                                                  final K to,
+                                                  final long timeFrom,
+                                                  final long timeTo) {
+        return new KeyValueIteratorFacade<>(inner.fetch(from, to, timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+                                                  final K to,
+                                                  final Instant fromTime,
+                                                  final Instant toTime) throws IllegalArgumentException {
+        return new KeyValueIteratorFacade<>(inner.fetch(from, to, fromTime, toTime));
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+                                                     final long timeTo) {
+        return new KeyValueIteratorFacade<>(inner.fetchAll(timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from,
+                                                     final Instant to) throws IllegalArgumentException {
+        final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> innerIterator = inner.fetchAll(from, to);
+        return new KeyValueIteratorFacade<>(innerIterator);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> all() {
+        final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> innerIterator = inner.all();
+        return new KeyValueIteratorFacade<>(innerIterator);
+    }
+
+    private static class WindowStoreIteratorFacade<V> implements WindowStoreIterator<V> {
+        final KeyValueIterator<Long, ValueAndTimestamp<V>> innerIterator;
+
+        WindowStoreIteratorFacade(final KeyValueIterator<Long, ValueAndTimestamp<V>> iterator) {
+            innerIterator = iterator;
+        }
+
+        @Override
+        public void close() {
+            innerIterator.close();
+        }
+
+        @Override
+        public Long peekNextKey() {
+            return innerIterator.peekNextKey();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return innerIterator.hasNext();
+        }
+
+        @Override
+        public KeyValue<Long, V> next() {
+            final KeyValue<Long, ValueAndTimestamp<V>> innerKeyValue = innerIterator.next();
+            return KeyValue.pair(innerKeyValue.key, innerKeyValue.value.value());
+        }
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacadeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacadeTest.java
new file mode 100644
index 0000000..19566e9
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacadeTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(EasyMockRunner.class)
+public class KeyValueIteratorFacadeTest {
+    @Mock
+    private KeyValueIterator<String, ValueAndTimestamp<String>> mockedKeyValueIterator;
+
+    private KeyValueIteratorFacade<String, String> keyValueIteratorFacade;
+
+    @Before
+    public void setup() {
+        keyValueIteratorFacade = new KeyValueIteratorFacade<>(mockedKeyValueIterator);
+    }
+
+    @Test
+    public void shouldForwardHasNext() {
+        expect(mockedKeyValueIterator.hasNext()).andReturn(true).andReturn(false);
+        replay(mockedKeyValueIterator);
+
+        assertTrue(keyValueIteratorFacade.hasNext());
+        assertFalse(keyValueIteratorFacade.hasNext());
+        verify(mockedKeyValueIterator);
+    }
+
+    @Test
+    public void shouldForwardPeekNextKey() {
+        expect(mockedKeyValueIterator.peekNextKey()).andReturn("key");
+        replay(mockedKeyValueIterator);
+
+        assertThat(keyValueIteratorFacade.peekNextKey(), is("key"));
+        verify(mockedKeyValueIterator);
+    }
+
+    @Test
+    public void shouldReturnPlainKeyValuePairOnGet() {
+        expect(mockedKeyValueIterator.next()).andReturn(
+            new KeyValue<>("key", ValueAndTimestamp.make("value", 42L)));
+        replay(mockedKeyValueIterator);
+
+        assertThat(keyValueIteratorFacade.next(), is(KeyValue.pair("key", "value")));
+        verify(mockedKeyValueIterator);
+    }
+
+    @Test
+    public void shouldCloseInnerIterator() {
+        mockedKeyValueIterator.close();
+        expectLastCall();
+        replay(mockedKeyValueIterator);
+
+        keyValueIteratorFacade.close();
+        verify(mockedKeyValueIterator);
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
new file mode 100644
index 0000000..4b0ca79
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNull;
+
+@RunWith(EasyMockRunner.class)
+public class ReadOnlyKeyValueStoreFacadeTest {
+    @Mock
+    private TimestampedKeyValueStore<String, String> mockedKeyValueTimestampStore;
+    @Mock
+    private KeyValueIterator<String, ValueAndTimestamp<String>> mockedKeyValueTimestampIterator;
+
+    private ReadOnlyKeyValueStoreFacade<String, String> readOnlyKeyValueStoreFacade;
+
+    @Before
+    public void setup() {
+        readOnlyKeyValueStoreFacade = new ReadOnlyKeyValueStoreFacade<>(mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnPlainValueOnGet() {
+        expect(mockedKeyValueTimestampStore.get("key"))
+            .andReturn(ValueAndTimestamp.make("value", 42L));
+        expect(mockedKeyValueTimestampStore.get("unknownKey"))
+            .andReturn(null);
+        replay(mockedKeyValueTimestampStore);
+
+        assertThat(readOnlyKeyValueStoreFacade.get("key"), is("value"));
+        assertNull(readOnlyKeyValueStoreFacade.get("unknownKey"));
+        verify(mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnPlainKeyValuePairsForRangeIterator() {
+        expect(mockedKeyValueTimestampIterator.next())
+            .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 21L)))
+            .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 42L)));
+        expect(mockedKeyValueTimestampStore.range("key1", "key2")).andReturn(mockedKeyValueTimestampIterator);
+        replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+
+        final KeyValueIterator<String, String> iterator = readOnlyKeyValueStoreFacade.range("key1", "key2");
+        assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+        verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnPlainKeyValuePairsForAllIterator() {
+        expect(mockedKeyValueTimestampIterator.next())
+            .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 21L)))
+            .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 42L)));
+        expect(mockedKeyValueTimestampStore.all()).andReturn(mockedKeyValueTimestampIterator);
+        replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+
+        final KeyValueIterator<String, String> iterator = readOnlyKeyValueStoreFacade.all();
+        assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+        verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldForwardApproximateNumEntries() {
+        expect(mockedKeyValueTimestampStore.approximateNumEntries()).andReturn(42L);
+        replay(mockedKeyValueTimestampStore);
+
+        assertThat(readOnlyKeyValueStoreFacade.approximateNumEntries(), is(42L));
+        verify(mockedKeyValueTimestampStore);
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacadeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacadeTest.java
new file mode 100644
index 0000000..47ebeaa
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacadeTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.time.Instant;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNull;
+
+@RunWith(EasyMockRunner.class)
+public class ReadOnlyWindowStoreFacadeTest {
+    @Mock
+    private TimestampedWindowStore<String, String> mockedWindowTimestampStore;
+    @Mock
+    private WindowStoreIterator<ValueAndTimestamp<String>> mockedWindowTimestampIterator;
+    @Mock
+    private KeyValueIterator<Windowed<String>, ValueAndTimestamp<String>> mockedKeyValueWindowTimestampIterator;
+
+    private ReadOnlyWindowStoreFacade<String, String> readOnlyWindowStoreFacade;
+
+    @Before
+    public void setup() {
+        readOnlyWindowStoreFacade = new ReadOnlyWindowStoreFacade<>(mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnPlainKeyValuePairsOnSingleKeyFetch() {
+        expect(mockedWindowTimestampStore.fetch("key1", 21L))
+            .andReturn(ValueAndTimestamp.make("value1", 42L));
+        expect(mockedWindowTimestampStore.fetch("unknownKey", 21L))
+            .andReturn(null);
+        replay(mockedWindowTimestampStore);
+
+        assertThat(readOnlyWindowStoreFacade.fetch("key1", 21L), is("value1"));
+        assertNull(readOnlyWindowStoreFacade.fetch("unknownKey", 21L));
+
+        verify(mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnPlainKeyValuePairsOnSingleKeyFetchLongParameters() {
+        expect(mockedWindowTimestampIterator.next())
+            .andReturn(KeyValue.pair(21L, ValueAndTimestamp.make("value1", 22L)))
+            .andReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2", 23L)));
+        expect(mockedWindowTimestampStore.fetch("key1", 21L, 42L))
+            .andReturn(mockedWindowTimestampIterator);
+        replay(mockedWindowTimestampIterator, mockedWindowTimestampStore);
+
+        final WindowStoreIterator<String> iterator =
+            readOnlyWindowStoreFacade.fetch("key1", 21L, 42L);
+
+        assertThat(iterator.next(), is(KeyValue.pair(21L, "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
+        verify(mockedWindowTimestampIterator, mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnPlainKeyValuePairsOnSingleKeyFetchInstantParameters() {
+        expect(mockedWindowTimestampIterator.next())
+            .andReturn(KeyValue.pair(21L, ValueAndTimestamp.make("value1", 22L)))
+            .andReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2", 23L)));
+        expect(mockedWindowTimestampStore.fetch("key1", Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
+            .andReturn(mockedWindowTimestampIterator);
+        replay(mockedWindowTimestampIterator, mockedWindowTimestampStore);
+
+        final WindowStoreIterator<String> iterator =
+            readOnlyWindowStoreFacade.fetch("key1", Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L));
+
+        assertThat(iterator.next(), is(KeyValue.pair(21L, "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
+        verify(mockedWindowTimestampIterator, mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnPlainKeyValuePairsOnRangeFetchLongParameters() {
+        expect(mockedKeyValueWindowTimestampIterator.next())
+            .andReturn(KeyValue.pair(
+                new Windowed<>("key1", new TimeWindow(21L, 22L)),
+                ValueAndTimestamp.make("value1", 22L)))
+            .andReturn(KeyValue.pair(
+                new Windowed<>("key2", new TimeWindow(42L, 43L)),
+                ValueAndTimestamp.make("value2", 100L)));
+        expect(mockedWindowTimestampStore.fetch("key1", "key2", 21L, 42L))
+            .andReturn(mockedKeyValueWindowTimestampIterator);
+        replay(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            readOnlyWindowStoreFacade.fetch("key1", "key2", 21L, 42L);
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new TimeWindow(21L, 22L)), "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", new TimeWindow(42L, 43L)), "value2")));
+        verify(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnPlainKeyValuePairsOnRangeFetchInstantParameters() {
+        expect(mockedKeyValueWindowTimestampIterator.next())
+            .andReturn(KeyValue.pair(
+                new Windowed<>("key1", new TimeWindow(21L, 22L)),
+                ValueAndTimestamp.make("value1", 22L)))
+            .andReturn(KeyValue.pair(
+                new Windowed<>("key2", new TimeWindow(42L, 43L)),
+                ValueAndTimestamp.make("value2", 100L)));
+        expect(mockedWindowTimestampStore.fetch("key1", "key2", Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
+            .andReturn(mockedKeyValueWindowTimestampIterator);
+        replay(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            readOnlyWindowStoreFacade.fetch("key1", "key2", Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L));
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new TimeWindow(21L, 22L)), "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", new TimeWindow(42L, 43L)), "value2")));
+        verify(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnPlainKeyValuePairsOnFetchAllLongParameters() {
+        expect(mockedKeyValueWindowTimestampIterator.next())
+            .andReturn(KeyValue.pair(
+                new Windowed<>("key1", new TimeWindow(21L, 22L)),
+                ValueAndTimestamp.make("value1", 22L)))
+            .andReturn(KeyValue.pair(
+                new Windowed<>("key2", new TimeWindow(42L, 43L)),
+                ValueAndTimestamp.make("value2", 100L)));
+        expect(mockedWindowTimestampStore.fetchAll(21L, 42L))
+            .andReturn(mockedKeyValueWindowTimestampIterator);
+        replay(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            readOnlyWindowStoreFacade.fetchAll(21L, 42L);
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new TimeWindow(21L, 22L)), "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", new TimeWindow(42L, 43L)), "value2")));
+        verify(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnPlainKeyValuePairsOnFetchAllInstantParameters() {
+        expect(mockedKeyValueWindowTimestampIterator.next())
+            .andReturn(KeyValue.pair(
+                new Windowed<>("key1", new TimeWindow(21L, 22L)),
+                ValueAndTimestamp.make("value1", 22L)))
+            .andReturn(KeyValue.pair(
+                new Windowed<>("key2", new TimeWindow(42L, 43L)),
+                ValueAndTimestamp.make("value2", 100L)));
+        expect(mockedWindowTimestampStore.fetchAll(Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
+            .andReturn(mockedKeyValueWindowTimestampIterator);
+        replay(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+
+        final KeyValueIterator<Windowed<String>, String> iterator =
+            readOnlyWindowStoreFacade.fetchAll(Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L));
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new TimeWindow(21L, 22L)), "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", new TimeWindow(42L, 43L)), "value2")));
+        verify(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnPlainKeyValuePairsOnAll() {
+        expect(mockedKeyValueWindowTimestampIterator.next())
+            .andReturn(KeyValue.pair(
+                new Windowed<>("key1", new TimeWindow(21L, 22L)),
+                ValueAndTimestamp.make("value1", 22L)))
+            .andReturn(KeyValue.pair(
+                new Windowed<>("key2", new TimeWindow(42L, 43L)),
+                ValueAndTimestamp.make("value2", 100L)));
+        expect(mockedWindowTimestampStore.all()).andReturn(mockedKeyValueWindowTimestampIterator);
+        replay(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+
+        final KeyValueIterator<Windowed<String>, String> iterator = readOnlyWindowStoreFacade.all();
+
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new TimeWindow(21L, 22L)), "value1")));
+        assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", new TimeWindow(42L, 43L)), "value2")));
+        verify(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+    }
+}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index e8f4fd6..b00d229 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -40,7 +40,9 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.internals.KeyValueStoreFacade;
 import org.apache.kafka.streams.internals.QuietStreamsConfig;
+import org.apache.kafka.streams.internals.WindowStoreFacade;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -62,10 +64,15 @@ import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.streams.test.OutputVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -175,6 +182,8 @@ import java.util.regex.Pattern;
 @InterfaceStability.Evolving
 public class TopologyTestDriver implements Closeable {
 
+    private static final Logger log = LoggerFactory.getLogger(TopologyTestDriver.class);
+
     private final Time mockWallClockTime;
     private final InternalTopologyBuilder internalTopologyBuilder;
 
@@ -245,7 +254,7 @@ public class TopologyTestDriver implements Closeable {
         processorTopology = internalTopologyBuilder.build(null);
         globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
         final boolean createStateDirectory = processorTopology.hasPersistentLocalStore() ||
-                (globalTopology != null && globalTopology.hasPersistentGlobalStore());
+            (globalTopology != null && globalTopology.hasPersistentGlobalStore());
 
         final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
         producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
@@ -465,7 +474,8 @@ public class TopologyTestDriver implements Closeable {
             // Forward back into the topology if the produced record is to an internal or a source topic ...
             final String outputTopicName = record.topic();
             if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)
-                    || globalPartitionsByTopic.containsKey(outputTopicName)) {
+                || globalPartitionsByTopic.containsKey(outputTopicName)) {
+
                 final byte[] serializedKey = record.key();
                 final byte[] serializedValue = record.value();
 
@@ -564,7 +574,9 @@ public class TopologyTestDriver implements Closeable {
      * @return all stores my name
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
+     * @see #getTimestampedKeyValueStore(String)
      * @see #getWindowStore(String)
+     * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
      */
     @SuppressWarnings("WeakerAccess")
@@ -587,7 +599,9 @@ public class TopologyTestDriver implements Closeable {
      * @return the state store, or {@code null} if no store has been registered with the given name
      * @see #getAllStateStores()
      * @see #getKeyValueStore(String)
+     * @see #getTimestampedKeyValueStore(String)
      * @see #getWindowStore(String)
+     * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
      */
     @SuppressWarnings("WeakerAccess")
@@ -611,46 +625,112 @@ public class TopologyTestDriver implements Closeable {
     }
 
     /**
-     * Get the {@link KeyValueStore} with the given name.
+     * Get the {@link KeyValueStore} or {@link TimestampedKeyValueStore} with the given name.
      * The store can be a "regular" or global store.
      * <p>
+     * If the registered store is a {@link TimestampedKeyValueStore} this method will return a value-only query
+     * interface. <strong>It is highly recommended to update the code for this case to avoid bugs and to use
+     * {@link #getTimestampedKeyValueStore(String)} for full store access instead.</strong>
+     * <p>
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
      * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
      *
      * @param name the name of the store
-     * @return the key value store, or {@code null} if no {@link KeyValueStore} has been registered with the given name
+     * @return the key value store, or {@code null} if no {@link KeyValueStore} or {@link TimestampedKeyValueStore}
+     * has been registered with the given name
      * @see #getAllStateStores()
      * @see #getStateStore(String)
+     * @see #getTimestampedKeyValueStore(String)
      * @see #getWindowStore(String)
+     * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
      */
     @SuppressWarnings({"unchecked", "WeakerAccess"})
     public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
         final StateStore store = getStateStore(name);
+        if (store instanceof TimestampedKeyValueStore) {
+            log.info("Method #getTimestampedKeyValueStore() should be used to access a TimestampedKeyValueStore.");
+            return new KeyValueStoreFacade<>((TimestampedKeyValueStore<K, V>) store);
+        }
         return store instanceof KeyValueStore ? (KeyValueStore<K, V>) store : null;
     }
 
     /**
-     * Get the {@link WindowStore} with the given name.
+     * Get the {@link TimestampedKeyValueStore} with the given name.
      * The store can be a "regular" or global store.
      * <p>
      * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
      * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
      *
      * @param name the name of the store
-     * @return the key value store, or {@code null} if no {@link WindowStore} has been registered with the given name
+     * @return the key value store, or {@code null} if no {@link TimestampedKeyValueStore} has been registered with the given name
      * @see #getAllStateStores()
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
-     * @see #getSessionStore(String) (String)
+     * @see #getWindowStore(String)
+     * @see #getTimestampedWindowStore(String)
+     * @see #getSessionStore(String)
      */
-    @SuppressWarnings({"unchecked", "WeakerAccess", "unused"})
+    @SuppressWarnings({"unchecked", "WeakerAccess"})
+    public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(final String name) {
+        final StateStore store = getStateStore(name);
+        return store instanceof TimestampedKeyValueStore ? (TimestampedKeyValueStore<K, V>) store : null;
+    }
+
+    /**
+     * Get the {@link WindowStore} or {@link TimestampedWindowStore} with the given name.
+     * The store can be a "regular" or global store.
+     * <p>
+     * If the registered store is a {@link TimestampedWindowStore} this method will return a value-only query
+     * interface. <strong>It is highly recommended to update the code for this case to avoid bugs and to use
+     * {@link #getTimestampedWindowStore(String)} for full store access instead.</strong>
+     * <p>
+     * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+     * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+     *
+     * @param name the name of the store
+     * @return the key value store, or {@code null} if no {@link WindowStore} or {@link TimestampedWindowStore}
+     * has been registered with the given name
+     * @see #getAllStateStores()
+     * @see #getStateStore(String)
+     * @see #getKeyValueStore(String)
+     * @see #getTimestampedKeyValueStore(String)
+     * @see #getTimestampedWindowStore(String)
+     * @see #getSessionStore(String)
+     */
+    @SuppressWarnings({"unchecked", "WeakerAccess"})
     public <K, V> WindowStore<K, V> getWindowStore(final String name) {
         final StateStore store = getStateStore(name);
+        if (store instanceof TimestampedWindowStore) {
+            log.info("Method #getTimestampedWindowStore() should be used to access a TimestampedWindowStore.");
+            return new WindowStoreFacade<>((TimestampedWindowStore<K, V>) store);
+        }
         return store instanceof WindowStore ? (WindowStore<K, V>) store : null;
     }
 
     /**
+     * Get the {@link TimestampedWindowStore} with the given name.
+     * The store can be a "regular" or global store.
+     * <p>
+     * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+     * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+     *
+     * @param name the name of the store
+     * @return the key value store, or {@code null} if no {@link TimestampedWindowStore} has been registered with the given name
+     * @see #getAllStateStores()
+     * @see #getStateStore(String)
+     * @see #getKeyValueStore(String)
+     * @see #getTimestampedKeyValueStore(String)
+     * @see #getWindowStore(String)
+     * @see #getSessionStore(String)
+     */
+    @SuppressWarnings({"unchecked", "WeakerAccess"})
+    public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(final String name) {
+        final StateStore store = getStateStore(name);
+        return store instanceof TimestampedWindowStore ? (TimestampedWindowStore<K, V>) store : null;
+    }
+
+    /**
      * Get the {@link SessionStore} with the given name.
      * The store can be a "regular" or global store.
      * <p>
@@ -662,9 +742,11 @@ public class TopologyTestDriver implements Closeable {
      * @see #getAllStateStores()
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
+     * @see #getTimestampedKeyValueStore(String)
      * @see #getWindowStore(String)
+     * @see #getTimestampedWindowStore(String)
      */
-    @SuppressWarnings({"unchecked", "WeakerAccess", "unused"})
+    @SuppressWarnings({"unchecked", "WeakerAccess"})
     public <K, V> SessionStore<K, V> getSessionStore(final String name) {
         final StateStore store = getStateStore(name);
         return store instanceof SessionStore ? (SessionStore<K, V>) store : null;
@@ -753,4 +835,4 @@ public class TopologyTestDriver implements Closeable {
         }
         return consumer;
     }
-}
+}
\ No newline at end of file
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
new file mode 100644
index 0000000..7f1da6e
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
+
+import java.util.List;
+
+public class KeyValueStoreFacade<K, V> extends ReadOnlyKeyValueStoreFacade<K, V> implements KeyValueStore<K, V> {
+
+    public KeyValueStoreFacade(final TimestampedKeyValueStore<K, V> inner) {
+        super(inner);
+    }
+
+    @Override
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
+        inner.init(context, root);
+    }
+
+    @Override
+    public void put(final K key,
+                    final V value) {
+        inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
+    }
+
+    @Override
+    public V putIfAbsent(final K key,
+                         final V value) {
+        final ValueAndTimestamp<V> old = inner.putIfAbsent(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
+        return old == null ? null : old.value();
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<K, V>> entries) {
+        for (final KeyValue<K, V> entry : entries) {
+            inner.put(entry.key, ValueAndTimestamp.make(entry.value, ConsumerRecord.NO_TIMESTAMP));
+        }
+    }
+
+    @Override
+    public V delete(final K key) {
+        final ValueAndTimestamp<V> old = inner.delete(key);
+        return old == null ? null : old.value();
+    }
+
+    @Override
+    public void flush() {
+        inner.flush();
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+
+    @Override
+    public String name() {
+        return inner.name();
+    }
+
+    @Override
+    public boolean persistent() {
+        return inner.persistent();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return inner.isOpen();
+    }
+}
\ No newline at end of file
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
new file mode 100644
index 0000000..f6f8f33
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.ReadOnlyWindowStoreFacade;
+
+public class WindowStoreFacade<K, V> extends ReadOnlyWindowStoreFacade<K, V> implements WindowStore<K, V> {
+
+    public WindowStoreFacade(final TimestampedWindowStore<K, V> store) {
+        super(store);
+    }
+
+    @Override
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
+        inner.init(context, root);
+    }
+
+    @Override
+    public void put(final K key,
+                    final V value) {
+        inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
+    }
+
+    @Override
+    public void put(final K key,
+                    final V value,
+                    final long windowStartTimestamp) {
+        inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP), windowStartTimestamp);
+    }
+
+    @Override
+    public void flush() {
+        inner.flush();
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+
+    @Override
+    public String name() {
+        return inner.name();
+    }
+
+    @Override
+    public boolean persistent() {
+        return inner.persistent();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return inner.isOpen();
+    }
+}
\ No newline at end of file
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
new file mode 100644
index 0000000..b7814c7
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static java.util.Arrays.asList;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
+
+@RunWith(EasyMockRunner.class)
+public class KeyValueStoreFacadeTest {
+    @Mock
+    private TimestampedKeyValueStore<String, String> mockedKeyValueTimestampStore;
+    @Mock
+    private KeyValueIterator<String, ValueAndTimestamp<String>> mockedKeyValueTimestampIterator;
+
+    private KeyValueStoreFacade<String, String> keyValueStoreFacade;
+
+    @Before
+    public void setup() {
+        keyValueStoreFacade = new KeyValueStoreFacade<>(mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldForwardInit() {
+        final ProcessorContext context = mock(ProcessorContext.class);
+        final StateStore store = mock(StateStore.class);
+        mockedKeyValueTimestampStore.init(context, store);
+        expectLastCall();
+        replay(mockedKeyValueTimestampStore);
+
+        keyValueStoreFacade.init(context, store);
+        verify(mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldPutWithUnknownTimestamp() {
+        mockedKeyValueTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP));
+        expectLastCall();
+        replay(mockedKeyValueTimestampStore);
+
+        keyValueStoreFacade.put("key", "value");
+        verify(mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldPutIfAbsentWithUnknownTimestamp() {
+        expect(mockedKeyValueTimestampStore.putIfAbsent("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP)))
+            .andReturn(null)
+            .andReturn(ValueAndTimestamp.make("oldValue", 42L));
+        replay(mockedKeyValueTimestampStore);
+
+        assertNull(keyValueStoreFacade.putIfAbsent("key", "value"));
+        assertThat(keyValueStoreFacade.putIfAbsent("key", "value"), is("oldValue"));
+        verify(mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldPutAllWithUnknownTimestamp() {
+        mockedKeyValueTimestampStore.put("key1", ValueAndTimestamp.make("value1", ConsumerRecord.NO_TIMESTAMP));
+        mockedKeyValueTimestampStore.put("key2", ValueAndTimestamp.make("value2", ConsumerRecord.NO_TIMESTAMP));
+        expectLastCall();
+        replay(mockedKeyValueTimestampStore);
+
+        keyValueStoreFacade.putAll(asList(
+            KeyValue.pair("key1", "value1"),
+            KeyValue.pair("key2", "value2")
+        ));
+        verify(mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldDeleteAndReturnPlainValue() {
+        expect(mockedKeyValueTimestampStore.delete("key"))
+            .andReturn(null)
+            .andReturn(ValueAndTimestamp.make("oldValue", 42L));
+        replay(mockedKeyValueTimestampStore);
+
+        assertNull(keyValueStoreFacade.delete("key"));
+        assertThat(keyValueStoreFacade.delete("key"), is("oldValue"));
+        verify(mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldForwardFlush() {
+        mockedKeyValueTimestampStore.flush();
+        expectLastCall();
+        replay(mockedKeyValueTimestampStore);
+
+        keyValueStoreFacade.flush();
+        verify(mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldForwardClose() {
+        mockedKeyValueTimestampStore.close();
+        expectLastCall();
+        replay(mockedKeyValueTimestampStore);
+
+        keyValueStoreFacade.close();
+        verify(mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnName() {
+        expect(mockedKeyValueTimestampStore.name()).andReturn("name");
+        replay(mockedKeyValueTimestampStore);
+
+        assertThat(keyValueStoreFacade.name(), is("name"));
+        verify(mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnIsPersistent() {
+        expect(mockedKeyValueTimestampStore.persistent())
+            .andReturn(true)
+            .andReturn(false);
+        replay(mockedKeyValueTimestampStore);
+
+        assertThat(keyValueStoreFacade.persistent(), is(true));
+        assertThat(keyValueStoreFacade.persistent(), is(false));
+        verify(mockedKeyValueTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnIsOpen() {
+        expect(mockedKeyValueTimestampStore.isOpen())
+            .andReturn(true)
+            .andReturn(false);
+        replay(mockedKeyValueTimestampStore);
+
+        assertThat(keyValueStoreFacade.isOpen(), is(true));
+        assertThat(keyValueStoreFacade.isOpen(), is(false));
+        verify(mockedKeyValueTimestampStore);
+    }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
new file mode 100644
index 0000000..972b308
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(EasyMockRunner.class)
+public class WindowStoreFacadeTest {
+    @Mock
+    private TimestampedWindowStore<String, String> mockedWindowTimestampStore;
+
+    private WindowStoreFacade<String, String> windowStoreFacade;
+
+    @Before
+    public void setup() {
+        windowStoreFacade = new WindowStoreFacade<>(mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldForwardInit() {
+        final ProcessorContext context = mock(ProcessorContext.class);
+        final StateStore store = mock(StateStore.class);
+        mockedWindowTimestampStore.init(context, store);
+        expectLastCall();
+        replay(mockedWindowTimestampStore);
+
+        windowStoreFacade.init(context, store);
+        verify(mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldPutWithUnknownTimestamp() {
+        mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP));
+        expectLastCall();
+        replay(mockedWindowTimestampStore);
+
+        windowStoreFacade.put("key", "value");
+        verify(mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldPutWindowStartTimestampWithUnknownTimestamp() {
+        mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP), 21L);
+        expectLastCall();
+        replay(mockedWindowTimestampStore);
+
+        windowStoreFacade.put("key", "value", 21L);
+        verify(mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldForwardFlush() {
+        mockedWindowTimestampStore.flush();
+        expectLastCall();
+        replay(mockedWindowTimestampStore);
+
+        windowStoreFacade.flush();
+        verify(mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldForwardClose() {
+        mockedWindowTimestampStore.close();
+        expectLastCall();
+        replay(mockedWindowTimestampStore);
+
+        windowStoreFacade.close();
+        verify(mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnName() {
+        expect(mockedWindowTimestampStore.name()).andReturn("name");
+        replay(mockedWindowTimestampStore);
+
+        assertThat(windowStoreFacade.name(), is("name"));
+        verify(mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnIsPersistent() {
+        expect(mockedWindowTimestampStore.persistent())
+            .andReturn(true)
+            .andReturn(false);
+        replay(mockedWindowTimestampStore);
+
+        assertThat(windowStoreFacade.persistent(), is(true));
+        assertThat(windowStoreFacade.persistent(), is(false));
+        verify(mockedWindowTimestampStore);
+    }
+
+    @Test
+    public void shouldReturnIsOpen() {
+        expect(mockedWindowTimestampStore.isOpen())
+            .andReturn(true)
+            .andReturn(false);
+        replay(mockedWindowTimestampStore);
+
+        assertThat(windowStoreFacade.isOpen(), is(true));
+        assertThat(windowStoreFacade.isOpen(), is(false));
+        verify(mockedWindowTimestampStore);
+    }
+
+}