You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/12 09:51:20 UTC

[kafka] branch trunk updated: KAFKA-6521: Use timestamped stores for KTables (#6667)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8649717  KAFKA-6521: Use timestamped stores for KTables (#6667)
8649717 is described below

commit 8649717d6dde081c75fc441a56f63ee1556dc758
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Sun May 12 11:50:55 2019 +0200

    KAFKA-6521: Use timestamped stores for KTables (#6667)
    
    Reviewers: John Roesler <jo...@confluent.io>, Boyang Chen <bo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../kstream/internals/KGroupedStreamImpl.java      |   2 +-
 .../kstream/internals/KGroupedTableImpl.java       |   2 +-
 .../kstream/internals/KStreamAggregate.java        |  38 +++--
 .../streams/kstream/internals/KStreamReduce.java   |  37 ++--
 .../kstream/internals/KStreamWindowAggregate.java  |  40 ++---
 .../streams/kstream/internals/KTableAggregate.java |  22 ++-
 .../streams/kstream/internals/KTableFilter.java    |  22 ++-
 .../streams/kstream/internals/KTableImpl.java      |  17 +-
 .../kstream/internals/KTableKTableJoinMerger.java  |  17 +-
 .../streams/kstream/internals/KTableMapValues.java |  22 ++-
 .../KTableMaterializedValueGetterSupplier.java     |  14 +-
 .../streams/kstream/internals/KTableReduce.java    |  22 ++-
 .../streams/kstream/internals/KTableSource.java    |  22 ++-
 .../internals/KTableSourceValueGetterSupplier.java |  16 +-
 .../kstream/internals/KTableTransformValues.java   |  29 ++--
 .../internals/SessionWindowedKStreamImpl.java      |   3 +-
 .../kstream/internals/TimeWindowedKStreamImpl.java |  16 +-
 .../internals/TimestampedCacheFlushListener.java   |  53 ++++++
 ...a => TimestampedKeyValueStoreMaterializer.java} |  11 +-
 ...rwarder.java => TimestampedTupleForwarder.java} |  25 +--
 .../streams/kstream/internals/TupleForwarder.java  |  15 +-
 .../internals/graph/KTableKTableJoinNode.java      |  25 +--
 .../internals/graph/TableProcessorNode.java        |   8 +-
 .../kstream/internals/graph/TableSourceNode.java   |   9 +-
 .../org/apache/kafka/streams/processor/To.java     |  23 +++
 .../internals/GlobalProcessorContextImpl.java      |  10 +-
 .../processor/internals/ProcessorContextImpl.java  |   6 +-
 .../kafka/streams/state/ValueAndTimestamp.java     |  14 +-
 .../state/internals/CachingKeyValueStore.java      |   6 +-
 .../internals/MeteredTimestampedKeyValueStore.java |   2 +-
 .../internals/TimestampedKeyValueStoreBuilder.java | 100 ++++++++++-
 .../internals/TimestampedWindowStoreBuilder.java   | 107 +++++++++++-
 .../kstream/internals/KTableReduceTest.java        |  14 +-
 .../internals/KTableTransformValuesTest.java       |   6 +-
 .../TimestampedCacheFlushListenerTest.java         |  75 ++++++++
 ...est.java => TimestampedTupleForwarderTest.java} |  31 ++--
 .../kstream/internals/TupleForwarderTest.java      |  19 ++-
 .../internals/GlobalStreamThreadTest.java          |   4 +-
 .../processor/internals/StandbyTaskTest.java       |  30 ++--
 ... TimestampedKeyValueStoreMaterializerTest.java} |  50 +++---
 .../org/apache/kafka/streams/state/StoresTest.java |  32 ++++
 .../TimestampedKeyValueStoreBuilderTest.java       |   8 +-
 .../TimestampedWindowStoreBuilderTest.java         |   7 +-
 .../GenericInMemoryTimestampedKeyValueStore.java   | 190 +++++++++++++++++++++
 44 files changed, 946 insertions(+), 275 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index eab1e8f..6f6521c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -185,7 +185,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
                                          final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
         return aggregateBuilder.build(
             functionName,
-            new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
+            new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize(),
             aggregateSupplier,
             materializedInternal.queryableStoreName(),
             materializedInternal.keySerde(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 56be0f6..75c998a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -88,7 +88,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
         final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>(
             funcName,
             new ProcessorParameters<>(aggregateSupplier, funcName),
-            new KeyValueStoreMaterializer<>(materialized).materialize()
+            new TimestampedKeyValueStoreMaterializer<>(materialized).materialize()
         );
 
         // now the repartition node must be the parent of the StateProcessorNode
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 4ead76b..7d367d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -22,17 +22,19 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
 public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> {
     private static final Logger LOG = LoggerFactory.getLogger(KStreamAggregate.class);
     private final String storeName;
     private final Initializer<T> initializer;
     private final Aggregator<? super K, ? super V, T> aggregator;
 
-
     private boolean sendOldValues = false;
 
     KStreamAggregate(final String storeName, final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator) {
@@ -51,22 +53,25 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
         sendOldValues = true;
     }
 
-    private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
 
-        private KeyValueStore<K, T> store;
+    private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedKeyValueStore<K, T> store;
         private StreamsMetricsImpl metrics;
-        private TupleForwarder<K, T> tupleForwarder;
+        private TimestampedTupleForwarder<K, T> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
-            store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
+            store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                store,
+                context,
+                new TimestampedCacheFlushListener<>(context),
+                sendOldValues);
         }
 
-
         @Override
         public void process(final K key, final V value) {
             // If the key or value is null we don't need to proceed
@@ -79,7 +84,8 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
                 return;
             }
 
-            T oldAgg = store.get(key);
+            final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
+            T oldAgg = getValueOrNull(oldAggAndTimestamp);
 
             if (oldAgg == null) {
                 oldAgg = initializer.apply();
@@ -91,14 +97,13 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
             newAgg = aggregator.apply(key, value, newAgg);
 
             // update the store with the new value
-            store.put(key, newAgg);
+            store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
             tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
         }
     }
 
     @Override
     public KTableValueGetterSupplier<K, T> view() {
-
         return new KTableValueGetterSupplier<K, T>() {
 
             public KTableValueGetter<K, T> get() {
@@ -112,23 +117,22 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
         };
     }
 
-    private class KStreamAggregateValueGetter implements KTableValueGetter<K, T> {
 
-        private KeyValueStore<K, T> store;
+    private class KStreamAggregateValueGetter implements KTableValueGetter<K, T> {
+        private TimestampedKeyValueStore<K, T> store;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
-            store = (KeyValueStore<K, T>) context.getStateStore(storeName);
+            store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
         }
 
         @Override
         public T get(final K key) {
-            return store.get(key);
+            return getValueOrNull(store.get(key));
         }
 
         @Override
-        public void close() {
-        }
+        public void close() {}
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index cd67283..a1b9f73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -21,10 +21,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
 public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> {
     private static final Logger LOG = LoggerFactory.getLogger(KStreamReduce.class);
 
@@ -48,10 +51,10 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
         sendOldValues = true;
     }
 
-    private class KStreamReduceProcessor extends AbstractProcessor<K, V> {
 
-        private KeyValueStore<K, V> store;
-        private TupleForwarder<K, V> tupleForwarder;
+    private class KStreamReduceProcessor extends AbstractProcessor<K, V> {
+        private TimestampedKeyValueStore<K, V> store;
+        private TimestampedTupleForwarder<K, V> tupleForwarder;
         private StreamsMetricsImpl metrics;
 
         @SuppressWarnings("unchecked")
@@ -59,11 +62,14 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
-            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
+            store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                store,
+                context,
+                new TimestampedCacheFlushListener<>(context),
+                sendOldValues);
         }
 
-
         @Override
         public void process(final K key, final V value) {
             // If the key or value is null we don't need to proceed
@@ -76,7 +82,8 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
                 return;
             }
 
-            final V oldAgg = store.get(key);
+            final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key);
+            final V oldAgg = getValueOrNull(oldAggAndTimestamp);
             V newAgg = oldAgg;
 
             // try to add the new value
@@ -87,14 +94,13 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
             }
 
             // update the store with the new value
-            store.put(key, newAgg);
+            store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
             tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
         }
     }
 
     @Override
     public KTableValueGetterSupplier<K, V> view() {
-
         return new KTableValueGetterSupplier<K, V>() {
 
             public KTableValueGetter<K, V> get() {
@@ -108,24 +114,23 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
         };
     }
 
-    private class KStreamReduceValueGetter implements KTableValueGetter<K, V> {
 
-        private KeyValueStore<K, V> store;
+    private class KStreamReduceValueGetter implements KTableValueGetter<K, V> {
+        private TimestampedKeyValueStore<K, V> store;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
-            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+            store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
         }
 
         @Override
         public V get(final K key) {
-            return store.get(key);
+            return getValueOrNull(store.get(key));
         }
 
         @Override
-        public void close() {
-        }
+        public void close() {}
     }
 }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index d09dadd..95ae54f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -29,12 +29,15 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
 public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -69,10 +72,10 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
         sendOldValues = true;
     }
 
-    private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {
 
-        private WindowStore<K, Agg> windowStore;
-        private TupleForwarder<Windowed<K>, Agg> tupleForwarder;
+    private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
         private StreamsMetricsImpl metrics;
         private InternalProcessorContext internalProcessorContext;
         private Sensor lateRecordDropSensor;
@@ -83,13 +86,14 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
         public void init(final ProcessorContext context) {
             super.init(context);
             internalProcessorContext = (InternalProcessorContext) context;
-
             metrics = (StreamsMetricsImpl) context.metrics();
-
             lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
-
-            windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                windowStore,
+                context,
+                new TimestampedCacheFlushListener<>(context),
+                sendOldValues);
         }
 
         @Override
@@ -115,7 +119,8 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
                 final Long windowStart = entry.getKey();
                 final long windowEnd = entry.getValue().end();
                 if (windowEnd > closeTime) {
-                    Agg oldAgg = windowStore.fetch(key, windowStart);
+                    final ValueAndTimestamp<Agg> oldAggAndTimestamp = windowStore.fetch(key, windowStart);
+                    Agg oldAgg = getValueOrNull(oldAggAndTimestamp);
 
                     if (oldAgg == null) {
                         oldAgg = initializer.apply();
@@ -124,7 +129,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
                     final Agg newAgg = aggregator.apply(key, value, oldAgg);
 
                     // update the store with the new value
-                    windowStore.put(key, newAgg, windowStart);
+                    windowStore.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()), windowStart);
                     tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, sendOldValues ? oldAgg : null);
                 } else {
                     log.debug(
@@ -154,7 +159,6 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
 
     @Override
     public KTableValueGetterSupplier<Windowed<K>, Agg> view() {
-
         return new KTableValueGetterSupplier<Windowed<K>, Agg>() {
 
             public KTableValueGetter<Windowed<K>, Agg> get() {
@@ -168,14 +172,14 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
         };
     }
 
-    private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, Agg> {
 
-        private WindowStore<K, Agg> windowStore;
+    private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, Agg> {
+        private TimestampedWindowStore<K, Agg> windowStore;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
-            windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName);
+            windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
         }
 
         @SuppressWarnings("unchecked")
@@ -183,12 +187,10 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
         public Agg get(final Windowed<K> windowedKey) {
             final K key = windowedKey.key();
             final W window = (W) windowedKey.window();
-
-            return windowStore.fetch(key, window.start());
+            return getValueOrNull(windowStore.fetch(key, window.start()));
         }
 
         @Override
-        public void close() {
-        }
+        public void close() {}
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 1c44486..88bf867 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -22,7 +22,10 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
 public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {
 
@@ -54,15 +57,19 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
     }
 
     private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
-        private KeyValueStore<K, T> store;
-        private TupleForwarder<K, T> tupleForwarder;
+        private TimestampedKeyValueStore<K, T> store;
+        private TimestampedTupleForwarder<K, T> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
+            store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                store,
+                context,
+                new TimestampedCacheFlushListener<>(context),
+                sendOldValues);
         }
 
         /**
@@ -75,7 +82,8 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
                 throw new StreamsException("Record key for KTable aggregate operator with state " + storeName + " should not be null.");
             }
 
-            final T oldAgg = store.get(key);
+            final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
+            final T oldAgg = getValueOrNull(oldAggAndTimestamp);
             final T intermediateAgg;
 
             // first try to remove the old value
@@ -101,7 +109,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
             }
 
             // update the store with the new value
-            store.put(key, newAgg);
+            store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
             tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 2410074..afa1822 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -20,10 +20,10 @@ import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
-
     private final KTableImpl<K, ?, V> parent;
     private final Predicate<? super K, ? super V> predicate;
     private final boolean filterNot;
@@ -61,18 +61,22 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
         return newValue;
     }
 
+
     private class KTableFilterProcessor extends AbstractProcessor<K, Change<V>> {
-        private KeyValueStore<K, V> store;
-        private TupleForwarder<K, V> tupleForwarder;
+        private TimestampedKeyValueStore<K, V> store;
+        private TimestampedTupleForwarder<K, V> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             if (queryableName != null) {
-                store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context,
-                    new ForwardingCacheFlushListener<>(context), sendOldValues);
+                store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
+                tupleForwarder = new TimestampedTupleForwarder<>(
+                    store,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
             }
         }
 
@@ -86,13 +90,12 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
             }
 
             if (queryableName != null) {
-                store.put(key, newValue);
+                store.put(key, ValueAndTimestamp.make(newValue, context().timestamp()));
                 tupleForwarder.maybeForward(key, newValue, oldValue);
             } else {
                 context().forward(key, new Change<>(newValue, oldValue));
             }
         }
-
     }
 
     @Override
@@ -117,6 +120,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
         }
     }
 
+
     private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
         private final KTableValueGetter<K, V> parentGetter;
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index c86f6ce..3a4994f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -117,7 +118,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         final Serde<K> keySerde;
         final Serde<V> valueSerde;
         final String queryableStoreName;
-        final StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+        final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder;
 
         if (materializedInternal != null) {
             // we actually do not need to generate store names at all since if it is not specified, we will not
@@ -132,7 +133,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             valueSerde = materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : this.valSerde;
             queryableStoreName = materializedInternal.queryableStoreName();
             // only materialize if materialized is specified and it has queryable name
-            storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
+            storeBuilder = queryableStoreName != null ? (new TimestampedKeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
         } else {
             keySerde = this.keySerde;
             valueSerde = this.valSerde;
@@ -204,7 +205,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         final Serde<K> keySerde;
         final Serde<VR> valueSerde;
         final String queryableStoreName;
-        final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
+        final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
 
         if (materializedInternal != null) {
             // we actually do not need to generate store names at all since if it is not specified, we will not
@@ -216,7 +217,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             valueSerde = materializedInternal.valueSerde();
             queryableStoreName = materializedInternal.queryableStoreName();
             // only materialize if materialized is specified and it has queryable name
-            storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
+            storeBuilder = queryableStoreName != null ? (new TimestampedKeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
         } else {
             keySerde = this.keySerde;
             valueSerde = null;
@@ -312,7 +313,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         final Serde<K> keySerde;
         final Serde<VR> valueSerde;
         final String queryableStoreName;
-        final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
+        final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
 
         if (materializedInternal != null) {
             // don't inherit parent value serde, since this operation may change the value type, more specifically:
@@ -322,7 +323,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             valueSerde = materializedInternal.valueSerde();
             queryableStoreName = materializedInternal.queryableStoreName();
             // only materialize if materialized is specified and it has queryable name
-            storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
+            storeBuilder = queryableStoreName != null ? (new TimestampedKeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
         } else {
             keySerde = this.keySerde;
             valueSerde = null;
@@ -538,13 +539,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         final Serde<K> keySerde;
         final Serde<VR> valueSerde;
         final String queryableStoreName;
-        final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
+        final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
 
         if (materializedInternal != null) {
             keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
             valueSerde = materializedInternal.valueSerde();
             queryableStoreName = materializedInternal.storeName();
-            storeBuilder = new KeyValueStoreMaterializer<>(materializedInternal).materialize();
+            storeBuilder = new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize();
         } else {
             keySerde = this.keySerde;
             valueSerde = null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index de38042..86088c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -19,7 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -94,17 +95,19 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
     }
 
     private class KTableKTableJoinMergeProcessor extends AbstractProcessor<K, Change<V>> {
-        private KeyValueStore<K, V> store;
-        private TupleForwarder<K, V> tupleForwarder;
+        private TimestampedKeyValueStore<K, V> store;
+        private TimestampedTupleForwarder<K, V> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             if (queryableName != null) {
-                store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context,
-                    new ForwardingCacheFlushListener<K, V>(context),
+                store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
+                tupleForwarder = new TimestampedTupleForwarder<>(
+                    store,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
                     sendOldValues);
             }
         }
@@ -112,7 +115,7 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
         @Override
         public void process(final K key, final Change<V> value) {
             if (queryableName != null) {
-                store.put(key, value.newValue);
+                store.put(key, ValueAndTimestamp.make(value.newValue, context().timestamp()));
                 tupleForwarder.maybeForward(key, value.newValue, sendOldValues ? value.oldValue : null);
             } else {
                 if (sendOldValues) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index aae1437..496127c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -20,11 +20,11 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 
 class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
-
     private final KTableImpl<K, ?, V> parent;
     private final ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper;
     private final String queryableName;
@@ -81,17 +81,22 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
         return newValue;
     }
 
+
     private class KTableMapValuesProcessor extends AbstractProcessor<K, Change<V>> {
-        private KeyValueStore<K, V1> store;
-        private TupleForwarder<K, V1> tupleForwarder;
+        private TimestampedKeyValueStore<K, V1> store;
+        private TimestampedTupleForwarder<K, V1> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             if (queryableName != null) {
-                store = (KeyValueStore<K, V1>) context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V1>(context), sendOldValues);
+                store = (TimestampedKeyValueStore<K, V1>) context.getStateStore(queryableName);
+                tupleForwarder = new TimestampedTupleForwarder<>(
+                    store,
+                    context,
+                    new TimestampedCacheFlushListener<K, V1>(context),
+                    sendOldValues);
             }
         }
 
@@ -101,7 +106,7 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
             final V1 oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;
 
             if (queryableName != null) {
-                store.put(key, newValue);
+                store.put(key, ValueAndTimestamp.make(newValue, context().timestamp()));
                 tupleForwarder.maybeForward(key, newValue, oldValue);
             } else {
                 context().forward(key, new Change<>(newValue, oldValue));
@@ -109,8 +114,8 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
         }
     }
 
-    private class KTableMapValuesValueGetter implements KTableValueGetter<K, V1> {
 
+    private class KTableMapValuesValueGetter implements KTableValueGetter<K, V1> {
         private final KTableValueGetter<K, V> parentGetter;
 
         KTableMapValuesValueGetter(final KTableValueGetter<K, V> parentGetter) {
@@ -127,7 +132,6 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
             return computeValue(key, parentGetter.get(key));
         }
 
-
         @Override
         public void close() {
             parentGetter.close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
index 0c17d59..a84251c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
@@ -18,10 +18,11 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
-public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
+public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
     private final String storeName;
 
     KTableMaterializedValueGetterSupplier(final String storeName) {
@@ -38,21 +39,20 @@ public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueG
     }
 
     private class KTableMaterializedValueGetter implements KTableValueGetter<K, V> {
-        private KeyValueStore<K, V> store;
+        private TimestampedKeyValueStore<K, V> store;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
-            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+            store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
         }
 
         @Override
         public V get(final K key) {
-            return store.get(key);
+            return getValueOrNull(store.get(key));
         }
 
         @Override
-        public void close() {
-        }
+        public void close() {}
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 70db6443..3055f51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -21,7 +21,10 @@ import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
 public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
 
@@ -49,15 +52,19 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
 
     private class KTableReduceProcessor extends AbstractProcessor<K, Change<V>> {
 
-        private KeyValueStore<K, V> store;
-        private TupleForwarder<K, V> tupleForwarder;
+        private TimestampedKeyValueStore<K, V> store;
+        private TimestampedTupleForwarder<K, V> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
+            store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                store,
+                context,
+                new TimestampedCacheFlushListener<>(context),
+                sendOldValues);
         }
 
         /**
@@ -70,7 +77,8 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
                 throw new StreamsException("Record key for KTable reduce operator with state " + storeName + " should not be null.");
             }
 
-            final V oldAgg = store.get(key);
+            final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key);
+            final V oldAgg = getValueOrNull(oldAggAndTimestamp);
             final V intermediateAgg;
 
             // first try to remove the old value
@@ -93,7 +101,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
             }
 
             // update the store with the new value
-            store.put(key, newAgg);
+            store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
             tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 6fc57bc..8b6ec6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -21,12 +21,15 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
 
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
 public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
     private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);
 
@@ -66,8 +69,8 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
 
     private class KTableSourceProcessor extends AbstractProcessor<K, V> {
 
-        private KeyValueStore<K, V> store;
-        private TupleForwarder<K, V> tupleForwarder;
+        private TimestampedKeyValueStore<K, V> store;
+        private TimestampedTupleForwarder<K, V> tupleForwarder;
         private StreamsMetricsImpl metrics;
 
         @SuppressWarnings("unchecked")
@@ -76,8 +79,12 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             if (queryableName != null) {
-                store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
+                store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
+                tupleForwarder = new TimestampedTupleForwarder<>(
+                    store,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
             }
         }
 
@@ -94,8 +101,9 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
             }
 
             if (queryableName != null) {
-                final V oldValue = sendOldValues ? store.get(key) : null;
-                store.put(key, value);
+                final ValueAndTimestamp<V> oldValueAndTimestamp = sendOldValues ? store.get(key) : null;
+                final V oldValue = getValueOrNull(oldValueAndTimestamp);
+                store.put(key, ValueAndTimestamp.make(value, context().timestamp()));
                 tupleForwarder.maybeForward(key, value, oldValue);
             } else {
                 context().forward(key, new Change<>(value, null));
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index 6882dac..5ec33f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -17,10 +17,11 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
-public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
+public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
     private final String storeName;
 
     KTableSourceValueGetterSupplier(final String storeName) {
@@ -37,21 +38,18 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS
     }
 
     private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
-
-        ReadOnlyKeyValueStore<K, V> store = null;
+        TimestampedKeyValueStore<K, V> store = null;
 
         @SuppressWarnings("unchecked")
         public void init(final ProcessorContext context) {
-            store = (ReadOnlyKeyValueStore<K, V>) context.getStateStore(storeName);
+            store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
         }
 
         public V get(final K key) {
-            return store.get(key);
+            return getValueOrNull(store.get(key));
         }
 
         @Override
-        public void close() {
-        }
+        public void close() {}
     }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index 88cea4f..5d81711 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -22,12 +22,14 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.util.Objects;
 
-class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
+class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
     private final KTableImpl<K, ?, V> parent;
     private final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> transformerSupplier;
     private final String queryableName;
@@ -74,10 +76,11 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
         sendOldValues = true;
     }
 
+
     private class KTableTransformValuesProcessor extends AbstractProcessor<K, Change<V>> {
         private final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;
-        private KeyValueStore<K, V1> store;
-        private TupleForwarder<K, V1> tupleForwarder;
+        private TimestampedKeyValueStore<K, V1> store;
+        private TimestampedTupleForwarder<K, V1> tupleForwarder;
 
         private KTableTransformValuesProcessor(final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
             this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
@@ -87,13 +90,14 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-
             valueTransformer.init(new ForwardingDisabledProcessorContext(context));
-
             if (queryableName != null) {
-                final ForwardingCacheFlushListener<K, V1> flushListener = new ForwardingCacheFlushListener<>(context);
-                store = (KeyValueStore<K, V1>) context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context, flushListener, sendOldValues);
+                store = (TimestampedKeyValueStore<K, V1>) context.getStateStore(queryableName);
+                tupleForwarder = new TimestampedTupleForwarder<>(
+                    store,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
             }
         }
 
@@ -105,8 +109,8 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
                 final V1 oldValue = sendOldValues ? valueTransformer.transform(key, change.oldValue) : null;
                 context().forward(key, new Change<>(newValue, oldValue));
             } else {
-                final V1 oldValue = sendOldValues ? store.get(key) : null;
-                store.put(key, newValue);
+                final V1 oldValue = sendOldValues ? getValueOrNull(store.get(key)) : null;
+                store.put(key, ValueAndTimestamp.make(newValue, context().timestamp()));
                 tupleForwarder.maybeForward(key, newValue, oldValue);
             }
         }
@@ -117,8 +121,8 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
         }
     }
 
-    private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> {
 
+    private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> {
         private final KTableValueGetter<K, V> parentGetter;
         private final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;
 
@@ -131,7 +135,6 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
         @Override
         public void init(final ProcessorContext context) {
             parentGetter.init(context);
-
             valueTransformer.init(new ForwardingDisabledProcessorContext(context));
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 8c731be..2106b1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 
+import java.time.Duration;
 import java.util.Objects;
 import java.util.Set;
 
@@ -193,7 +194,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
             }
             supplier = Stores.persistentSessionStore(
                 materialized.storeName(),
-                retentionPeriod
+                Duration.ofMillis(retentionPeriod)
             );
         }
         final StoreBuilder<SessionStore<K, VR>> builder = Stores.sessionStoreBuilder(
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index f549ce9..a257603 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -31,8 +31,10 @@ import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
 
 import java.time.Duration;
 import java.util.Objects;
@@ -154,7 +156,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
     }
 
     @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
-    private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
+    private <VR> StoreBuilder<TimestampedWindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
         WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
         if (supplier == null) {
             if (materialized.retention() != null) {
@@ -169,7 +171,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
                                                            + " retention=[" + retentionPeriod + "]");
                 }
 
-                supplier = Stores.persistentWindowStore(
+                supplier = Stores.persistentTimestampedWindowStore(
                     materialized.storeName(),
                     Duration.ofMillis(retentionPeriod),
                     Duration.ofMillis(windows.size()),
@@ -190,16 +192,16 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
                                                            + " retention=[" + windows.maintainMs() + "]");
                 }
 
-                supplier = Stores.persistentWindowStore(
+                supplier = new RocksDbWindowBytesStoreSupplier(
                     materialized.storeName(),
                     windows.maintainMs(),
-                    windows.segments,
+                    Math.max(windows.maintainMs() / (windows.segments - 1), 60_000L),
                     windows.size(),
-                    false
-                );
+                    false,
+                    true);
             }
         }
-        final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(
+        final StoreBuilder<TimestampedWindowStore<K, VR>> builder = Stores.timestampedWindowStoreBuilder(
             supplier,
             materialized.keySerde(),
             materialized.valueSerde()
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
new file mode 100644
index 0000000..5540376
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
+
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+class TimestampedCacheFlushListener<K, V> implements CacheFlushListener<K, ValueAndTimestamp<V>> {
+    private final InternalProcessorContext context;
+    private final ProcessorNode myNode;
+
+    TimestampedCacheFlushListener(final ProcessorContext context) {
+        this.context = (InternalProcessorContext) context;
+        myNode = this.context.currentNode();
+    }
+
+    @Override
+    public void apply(final K key,
+                      final ValueAndTimestamp<V> newValue,
+                      final ValueAndTimestamp<V> oldValue,
+                      final long timestamp) {
+        final ProcessorNode prev = context.currentNode();
+        context.setCurrentNode(myNode);
+        try {
+            context.forward(
+                key,
+                new Change<>(getValueOrNull(newValue), getValueOrNull(oldValue)),
+                To.all().withTimestamp(newValue != null ? newValue.timestamp() : timestamp));
+        } finally {
+            context.setCurrentNode(prev);
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java
similarity index 79%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java
index 67872be..fb40b46 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java
@@ -21,24 +21,25 @@ import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
-public class KeyValueStoreMaterializer<K, V> {
+public class TimestampedKeyValueStoreMaterializer<K, V> {
     private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized;
 
-    public KeyValueStoreMaterializer(final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+    public TimestampedKeyValueStoreMaterializer(final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         this.materialized = materialized;
     }
 
     /**
      * @return  StoreBuilder
      */
-    public StoreBuilder<KeyValueStore<K, V>> materialize() {
+    public StoreBuilder<TimestampedKeyValueStore<K, V>> materialize() {
         KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier();
         if (supplier == null) {
             final String name = materialized.storeName();
-            supplier = Stores.persistentKeyValueStore(name);
+            supplier = Stores.persistentTimestampedKeyValueStore(name);
         }
-        final StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder(
+        final StoreBuilder<TimestampedKeyValueStore<K, V>> builder = Stores.timestampedKeyValueStoreBuilder(
             supplier,
             materialized.keySerde(),
             materialized.valueSerde());
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
similarity index 74%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index 323e198..ab2b506 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -25,28 +25,29 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
  * Forwarding by this class only occurs when caching is not enabled. If caching is enabled,
  * forwarding occurs in the flush listener when the cached store flushes.
  *
- * @param <K>
- * @param <V>
+ * @param <K> the type of the key
+ * @param <V> the type of the value
  */
-class TupleForwarder<K, V> {
-    private final boolean cachingEnabled;
+class TimestampedTupleForwarder<K, V> {
     private final ProcessorContext context;
+    private final boolean sendOldValues;
+    private final boolean cachingEnabled;
 
     @SuppressWarnings("unchecked")
-    TupleForwarder(final StateStore store,
-                   final ProcessorContext context,
-                   final ForwardingCacheFlushListener<K, V> flushListener,
-                   final boolean sendOldValues) {
-        cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
+    TimestampedTupleForwarder(final StateStore store,
+                              final ProcessorContext context,
+                              final TimestampedCacheFlushListener<K, V> flushListener,
+                              final boolean sendOldValues) {
         this.context = context;
+        this.sendOldValues = sendOldValues;
+        cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
     }
 
     public void maybeForward(final K key,
                              final V newValue,
                              final V oldValue) {
-        if (cachingEnabled) {
-            return;
+        if (!cachingEnabled) {
+            context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null));
         }
-        context.forward(key, new Change<>(newValue, oldValue));
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index 323e198..94b0ebd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -25,28 +25,29 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
  * Forwarding by this class only occurs when caching is not enabled. If caching is enabled,
  * forwarding occurs in the flush listener when the cached store flushes.
  *
- * @param <K>
- * @param <V>
+ * @param <K> the type of the key
+ * @param <V> the type of the value
  */
 class TupleForwarder<K, V> {
-    private final boolean cachingEnabled;
     private final ProcessorContext context;
+    private final boolean sendOldValues;
+    private final boolean cachingEnabled;
 
     @SuppressWarnings("unchecked")
     TupleForwarder(final StateStore store,
                    final ProcessorContext context,
                    final ForwardingCacheFlushListener<K, V> flushListener,
                    final boolean sendOldValues) {
-        cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
         this.context = context;
+        this.sendOldValues = sendOldValues;
+        cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
     }
 
     public void maybeForward(final K key,
                              final V newValue,
                              final V oldValue) {
-        if (cachingEnabled) {
-            return;
+        if (!cachingEnabled) {
+            context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null));
         }
-        context.forward(key, new Change<>(newValue, oldValue));
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 03bdda0..542726b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -22,8 +22,8 @@ import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger;
 import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
 import java.util.Arrays;
 
@@ -36,7 +36,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
     private final Serde<VR> valueSerde;
     private final String[] joinThisStoreNames;
     private final String[] joinOtherStoreNames;
-    private final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
+    private final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
 
     KTableKTableJoinNode(final String nodeName,
                          final ProcessorParameters<K, Change<V1>> joinThisProcessorParameters,
@@ -48,7 +48,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
                          final Serde<VR> valueSerde,
                          final String[] joinThisStoreNames,
                          final String[] joinOtherStoreNames,
-                         final StoreBuilder<KeyValueStore<K, VR>> storeBuilder) {
+                         final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder) {
 
         super(nodeName,
             null,
@@ -98,23 +98,24 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
         final String otherProcessorName = otherProcessorParameters().processorName();
         final String mergeProcessorName = mergeProcessorParameters().processorName();
 
-        topologyBuilder.addProcessor(thisProcessorName,
+        topologyBuilder.addProcessor(
+            thisProcessorName,
             thisProcessorParameters().processorSupplier(),
             thisJoinSideNodeName());
 
-        topologyBuilder.addProcessor(otherProcessorName,
+        topologyBuilder.addProcessor(
+            otherProcessorName,
             otherProcessorParameters().processorSupplier(),
             otherJoinSideNodeName());
 
-        topologyBuilder.addProcessor(mergeProcessorName,
+        topologyBuilder.addProcessor(
+            mergeProcessorName,
             mergeProcessorParameters().processorSupplier(),
             thisProcessorName,
             otherProcessorName);
 
-        topologyBuilder.connectProcessorAndStateStores(thisProcessorName,
-            joinOtherStoreNames);
-        topologyBuilder.connectProcessorAndStateStores(otherProcessorName,
-            joinThisStoreNames);
+        topologyBuilder.connectProcessorAndStateStores(thisProcessorName, joinOtherStoreNames);
+        topologyBuilder.connectProcessorAndStateStores(otherProcessorName, joinThisStoreNames);
 
         if (storeBuilder != null) {
             topologyBuilder.addStateStore(storeBuilder, mergeProcessorName);
@@ -144,7 +145,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
         private String[] joinThisStoreNames;
         private String[] joinOtherStoreNames;
         private String queryableStoreName;
-        private StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
+        private StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
 
         private KTableKTableJoinNodeBuilder() {
         }
@@ -199,7 +200,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withStoreBuilder(final StoreBuilder<KeyValueStore<K, VR>> storeBuilder) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withStoreBuilder(final StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder) {
             this.storeBuilder = storeBuilder;
             return this;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index df009df..24df4d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -18,26 +18,26 @@
 package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
 import java.util.Arrays;
 
 public class TableProcessorNode<K, V> extends StreamsGraphNode {
 
     private final ProcessorParameters<K, V> processorParameters;
-    private final StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+    private final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder;
     private final String[] storeNames;
 
     public TableProcessorNode(final String nodeName,
                               final ProcessorParameters<K, V> processorParameters,
-                              final StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
+                              final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder) {
         this(nodeName, processorParameters, storeBuilder, null);
     }
 
     public TableProcessorNode(final String nodeName,
                               final ProcessorParameters<K, V> processorParameters,
-                              final StoreBuilder<KeyValueStore<K, V>> storeBuilder,
+                              final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder,
                               final String[] storeNames) {
         super(nodeName);
         this.processorParameters = processorParameters;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index fa979b2..b1df6ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -20,11 +20,12 @@ package org.apache.kafka.streams.kstream.internals.graph;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
-import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
 import java.util.Collections;
 
@@ -82,10 +83,10 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         final String topicName = getTopicNames().iterator().next();
 
-        // TODO: we assume source KTables can only be key-value stores for now.
+        // TODO: we assume source KTables can only be timestamped-key-value stores for now.
         // should be expanded for other types of stores as well.
-        final StoreBuilder<KeyValueStore<K, V>> storeBuilder =
-            new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();
+        final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder =
+            new TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();
 
         if (isGlobalKTable) {
             topologyBuilder.addGlobalStore(storeBuilder,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/To.java b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
index f9cc4c8..dc124aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/To.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.processor;
 
+import java.util.Objects;
+
 /**
  * This class is used to provide the optional parameters when sending output records to downstream processor
  * using {@link ProcessorContext#forward(Object, Object, To)}.
@@ -65,4 +67,25 @@ public class To {
         this.timestamp = timestamp;
         return this;
     }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final To to = (To) o;
+        return timestamp == to.timestamp &&
+            Objects.equals(childName, to.childName);
+    }
+
+    /**
+     * Equality is implemented in support of tests, *not* for use in Hash collections, since this class is mutable.
+     */
+    @Override
+    public int hashCode() {
+        throw new UnsupportedOperationException("To is unsafe for use in Hash collections");
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 0693ef7..3c8a3ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -25,10 +25,14 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.KeyValueStoreReadWriteDecorator;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.SessionStoreReadWriteDecorator;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.TimestampedKeyValueStoreReadWriteDecorator;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.TimestampedWindowStoreReadWriteDecorator;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.WindowStoreReadWriteDecorator;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -50,8 +54,12 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
     public StateStore getStateStore(final String name) {
         final StateStore store = stateManager.getGlobalStore(name);
 
-        if (store instanceof KeyValueStore) {
+        if (store instanceof TimestampedKeyValueStore) {
+            return new TimestampedKeyValueStoreReadWriteDecorator((TimestampedKeyValueStore) store);
+        } else if (store instanceof KeyValueStore) {
             return new KeyValueStoreReadWriteDecorator((KeyValueStore) store);
+        } else if (store instanceof TimestampedWindowStore) {
+            return new TimestampedWindowStoreReadWriteDecorator((TimestampedWindowStore) store);
         } else if (store instanceof WindowStore) {
             return new WindowStoreReadWriteDecorator((WindowStore) store);
         } else if (store instanceof SessionStore) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 232b28d..1df6610 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -498,11 +498,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
-    private static class TimestampedKeyValueStoreReadWriteDecorator<K, V>
+    static class TimestampedKeyValueStoreReadWriteDecorator<K, V>
         extends KeyValueStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
         implements TimestampedKeyValueStore<K, V> {
 
-        private TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) {
+        TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) {
             super(inner);
         }
     }
@@ -564,7 +564,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
-    private static class TimestampedWindowStoreReadWriteDecorator<K, V>
+    static class TimestampedWindowStoreReadWriteDecorator<K, V>
         extends WindowStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
         implements TimestampedWindowStore<K, V> {
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
index 8bb652c..f5fc7a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
@@ -25,7 +25,7 @@ import java.util.Objects;
  *
  * @param <V>
  */
-public class ValueAndTimestamp<V> {
+public final class ValueAndTimestamp<V> {
     private final V value;
     private final long timestamp;
 
@@ -50,6 +50,18 @@ public class ValueAndTimestamp<V> {
         return value == null ? null : new ValueAndTimestamp<>(value, timestamp);
     }
 
+    /**
+     * Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter
+     * if the parameter is not {@code null}.
+     *
+     * @param valueAndTimestamp a {@link ValueAndTimestamp} instance; can be {@code null}
+     * @param <V> the type of the value
+     * @return the wrapped {@code value} of {@code valueAndTimestamp} if not {@code null}; otherwise {@code null}
+     */
+    public static <V> V getValueOrNull(final ValueAndTimestamp<V> valueAndTimestamp) {
+        return valueAndTimestamp == null ? null : valueAndTimestamp.value();
+    }
+
     public V value() {
         return value;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 95e20b4..8aa0ceb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -24,16 +24,16 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-class CachingKeyValueStore
+public class CachingKeyValueStore
     extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]>
     implements KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 2fa7c96..468b554 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -51,6 +51,6 @@ public class MeteredTimestampedKeyValueStore<K, V>
         serdes = new StateSerdes<>(
             ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
             keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
+            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index f43e4e6..863b44b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -19,12 +19,17 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
+import java.util.List;
 import java.util.Objects;
 
 public class TimestampedKeyValueStoreBuilder<K, V>
@@ -48,8 +53,12 @@ public class TimestampedKeyValueStoreBuilder<K, V>
     @Override
     public TimestampedKeyValueStore<K, V> build() {
         KeyValueStore<Bytes, byte[]> store = storeSupplier.get();
-        if (!(store instanceof TimestampedBytesStore) && store.persistent()) {
-            store = new KeyValueToTimestampedKeyValueByteStoreAdapter(store);
+        if (!(store instanceof TimestampedBytesStore)) {
+            if (store.persistent()) {
+                store = new KeyValueToTimestampedKeyValueByteStoreAdapter(store);
+            } else {
+                store = new InMemoryTimestampedKeyValueStoreMarker(store);
+            }
         }
         return new MeteredTimestampedKeyValueStore<>(
             maybeWrapCaching(maybeWrapLogging(store)),
@@ -72,4 +81,91 @@ public class TimestampedKeyValueStoreBuilder<K, V>
         }
         return new ChangeLoggingTimestampedKeyValueBytesStore(inner);
     }
+
+    private final static class InMemoryTimestampedKeyValueStoreMarker
+        implements KeyValueStore<Bytes, byte[]>, TimestampedBytesStore {
+
+        final KeyValueStore<Bytes, byte[]> wrapped;
+
+        private InMemoryTimestampedKeyValueStoreMarker(final KeyValueStore<Bytes, byte[]> wrapped) {
+            if (wrapped.persistent()) {
+                throw new IllegalArgumentException("Provided store must not be a persistent store, but it is.");
+            }
+            this.wrapped = wrapped;
+        }
+
+        @Override
+        public void init(final ProcessorContext context,
+                         final StateStore root) {
+            wrapped.init(context, root);
+        }
+
+        @Override
+        public void put(final Bytes key,
+                        final byte[] value) {
+            wrapped.put(key, value);
+        }
+
+        @Override
+        public byte[] putIfAbsent(final Bytes key,
+                                  final byte[] value) {
+            return wrapped.putIfAbsent(key, value);
+        }
+
+        @Override
+        public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+            wrapped.putAll(entries);
+        }
+
+        @Override
+        public byte[] delete(final Bytes key) {
+            return wrapped.delete(key);
+        }
+
+        @Override
+        public byte[] get(final Bytes key) {
+            return wrapped.get(key);
+        }
+
+        @Override
+        public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                     final Bytes to) {
+            return wrapped.range(from, to);
+        }
+
+        @Override
+        public KeyValueIterator<Bytes, byte[]> all() {
+            return wrapped.all();
+        }
+
+        @Override
+        public long approximateNumEntries() {
+            return wrapped.approximateNumEntries();
+        }
+
+        @Override
+        public void flush() {
+            wrapped.flush();
+        }
+
+        @Override
+        public void close() {
+            wrapped.close();
+        }
+
+        @Override
+        public boolean isOpen() {
+            return wrapped.isOpen();
+        }
+
+        @Override
+        public String name() {
+            return wrapped.name();
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+    }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index 2c7c950..808d31e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -19,11 +19,16 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import java.util.Objects;
 
@@ -36,7 +41,7 @@ public class TimestampedWindowStoreBuilder<K, V>
                                          final Serde<K> keySerde,
                                          final Serde<V> valueSerde,
                                          final Time time) {
-        super(storeSupplier.name(), keySerde, new ValueAndTimestampSerde<>(valueSerde), time);
+        super(storeSupplier.name(), keySerde, valueSerde == null ? null : new ValueAndTimestampSerde<>(valueSerde), time);
         Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null");
         this.storeSupplier = storeSupplier;
     }
@@ -44,8 +49,12 @@ public class TimestampedWindowStoreBuilder<K, V>
     @Override
     public TimestampedWindowStore<K, V> build() {
         WindowStore<Bytes, byte[]> store = storeSupplier.get();
-        if (!(store instanceof TimestampedBytesStore) && store.persistent()) {
-            store = new WindowToTimestampedWindowByteStoreAdapter(store);
+        if (!(store instanceof TimestampedBytesStore)) {
+            if (store.persistent()) {
+                store = new WindowToTimestampedWindowByteStoreAdapter(store);
+            } else {
+                store = new InMemoryTimestampedWindowStoreMarker(store);
+            }
         }
         return new MeteredTimestampedWindowStore<>(
             maybeWrapCaching(maybeWrapLogging(store)),
@@ -76,4 +85,96 @@ public class TimestampedWindowStoreBuilder<K, V>
     public long retentionPeriod() {
         return storeSupplier.retentionPeriod();
     }
+
+
+    private final static class InMemoryTimestampedWindowStoreMarker
+        implements WindowStore<Bytes, byte[]>, TimestampedBytesStore {
+
+        private final WindowStore<Bytes, byte[]> wrapped;
+
+        private InMemoryTimestampedWindowStoreMarker(final WindowStore<Bytes, byte[]> wrapped) {
+            if (wrapped.persistent()) {
+                throw new IllegalArgumentException("Provided store must not be a persistent store, but it is.");
+            }
+            this.wrapped = wrapped;
+        }
+
+        @Override
+        public void init(final ProcessorContext context,
+                         final StateStore root) {
+            wrapped.init(context, root);
+        }
+
+        @Override
+        public void put(final Bytes key,
+                        final byte[] value) {
+            wrapped.put(key, value);
+        }
+
+        @Override
+        public void put(final Bytes key,
+                        final byte[] value,
+                        final long windowStartTimestamp) {
+            wrapped.put(key, value, windowStartTimestamp);
+        }
+
+        @Override
+        public byte[] fetch(final Bytes key,
+                            final long time) {
+            return wrapped.fetch(key, time);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public WindowStoreIterator<byte[]> fetch(final Bytes key,
+                                                 final long timeFrom,
+                                                 final long timeTo) {
+            return wrapped.fetch(key, timeFrom, timeTo);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
+                                                               final Bytes to,
+                                                               final long timeFrom,
+                                                               final long timeTo) {
+            return wrapped.fetch(from, to, timeFrom, timeTo);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
+                                                                  final long timeTo) {
+            return wrapped.fetchAll(timeFrom, timeTo);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+            return wrapped.all();
+        }
+
+        @Override
+        public void flush() {
+            wrapped.flush();
+        }
+
+        @Override
+        public void close() {
+            wrapped.close();
+        }
+        @Override
+        public boolean isOpen() {
+            return wrapped.isOpen();
+        }
+
+        @Override
+        public String name() {
+            return wrapped.name();
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index afb2cc1..600f3db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -19,8 +19,9 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.GenericInMemoryKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.test.GenericInMemoryTimestampedKeyValueStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.junit.Test;
 
@@ -45,18 +46,19 @@ public class KTableReduceTest {
                 this::differenceNotNullArgs
             ).get();
 
-        final KeyValueStore<String, Set<String>> myStore = new GenericInMemoryKeyValueStore<>("myStore");
+        final TimestampedKeyValueStore<String, Set<String>> myStore =
+            new GenericInMemoryTimestampedKeyValueStore<>("myStore");
 
         context.register(myStore, null);
         reduceProcessor.init(context);
         context.setCurrentNode(new ProcessorNode<>("reduce", reduceProcessor, singleton("myStore")));
 
         reduceProcessor.process("A", new Change<>(singleton("a"), null));
-        assertEquals(singleton("a"), myStore.get("A"));
+        assertEquals(ValueAndTimestamp.make(singleton("a"), -1L), myStore.get("A"));
         reduceProcessor.process("A", new Change<>(singleton("b"), singleton("a")));
-        assertEquals(singleton("b"), myStore.get("A"));
+        assertEquals(ValueAndTimestamp.make(singleton("b"), -1L), myStore.get("A"));
         reduceProcessor.process("A", new Change<>(null, singleton("b")));
-        assertEquals(emptySet(), myStore.get("A"));
+        assertEquals(ValueAndTimestamp.make(emptySet(), -1L), myStore.get("A"));
     }
 
     private Set<String> differenceNotNullArgs(final Set<String> left, final Set<String> right) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index ed6b649..a9bf7f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -37,6 +37,8 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
@@ -90,7 +92,7 @@ public class KTableTransformValuesTest {
     @Mock(MockType.NICE)
     private KTableValueGetter<String, String> parentGetter;
     @Mock(MockType.NICE)
-    private KeyValueStore<String, String> stateStore;
+    private TimestampedKeyValueStore<String, String> stateStore;
     @Mock(MockType.NICE)
     private ValueTransformerWithKeySupplier<String, String, String> mockSupplier;
     @Mock(MockType.NICE)
@@ -220,7 +222,7 @@ public class KTableTransformValuesTest {
             new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), QUERYABLE_NAME);
 
         expect(context.getStateStore(QUERYABLE_NAME)).andReturn(stateStore);
-        expect(stateStore.get("Key")).andReturn("something");
+        expect(stateStore.get("Key")).andReturn(ValueAndTimestamp.make("something", 0L));
         replay(context, stateStore);
 
         final KTableValueGetter<String, String> getter = transformValues.view().get();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
new file mode 100644
index 0000000..38ef5c6
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class TimestampedCacheFlushListenerTest {
+
+    @Test
+    public void shouldForwardValueTimestampIfNewValueExists() {
+        final InternalProcessorContext context = mock(InternalProcessorContext.class);
+        expect(context.currentNode()).andReturn(null).anyTimes();
+        context.setCurrentNode(null);
+        context.setCurrentNode(null);
+        context.forward(
+            "key",
+            new Change<>("newValue", "oldValue"),
+            To.all().withTimestamp(42L));
+        expectLastCall();
+        replay(context);
+
+        new TimestampedCacheFlushListener<>(context).apply(
+            "key",
+            ValueAndTimestamp.make("newValue", 42L),
+            ValueAndTimestamp.make("oldValue", 21L),
+            73L);
+
+        verify(context);
+    }
+
+    @Test
+    public void shouldForwardParameterTimestampIfNewValueIsNull() {
+        final InternalProcessorContext context = mock(InternalProcessorContext.class);
+        expect(context.currentNode()).andReturn(null).anyTimes();
+        context.setCurrentNode(null);
+        context.setCurrentNode(null);
+        context.forward(
+            "key",
+            new Change<>(null, "oldValue"),
+            To.all().withTimestamp(73L));
+        expectLastCall();
+        replay(context);
+
+        new TimestampedCacheFlushListener<>(context).apply(
+            "key",
+            null,
+            ValueAndTimestamp.make("oldValue", 21L),
+            73L);
+
+        verify(context);
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
similarity index 66%
copy from streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
copy to streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index 68c6eaa..068cb6b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.junit.Test;
 
@@ -27,7 +28,7 @@ import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 
-public class TupleForwarderTest {
+public class TimestampedTupleForwarderTest {
 
     @Test
     public void shouldSetFlushListenerOnWrappedStateStore() {
@@ -36,29 +37,38 @@ public class TupleForwarderTest {
     }
 
     private void setFlushListener(final boolean sendOldValues) {
-        final WrappedStateStore<StateStore, Object, Object> store = mock(WrappedStateStore.class);
-        final ForwardingCacheFlushListener<Object, Object> flushListener = mock(ForwardingCacheFlushListener.class);
+        final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>> store = mock(WrappedStateStore.class);
+        final TimestampedCacheFlushListener<Object, Object> flushListener = mock(TimestampedCacheFlushListener.class);
 
         expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false);
         replay(store);
 
-        new TupleForwarder<>(store, null, flushListener, sendOldValues);
+        new TimestampedTupleForwarder<>(store, null, flushListener, sendOldValues);
 
         verify(store);
     }
 
     @Test
     public void shouldForwardRecordsIfWrappedStateStoreDoesNotCache() {
+        shouldForwardRecordsIfWrappedStateStoreDoesNotCache(false);
+        shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true);
+    }
+
+    private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) {
         final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
         final ProcessorContext context = mock(ProcessorContext.class);
 
-        expect(store.setFlushListener(null, false)).andReturn(false);
-        context.forward("key", new Change<>("value", "oldValue"));
+        expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
+        if (sendOldValues) {
+            context.forward("key", new Change<>("newValue",  "oldValue"));
+        } else {
+            context.forward("key", new Change<>("newValue", null));
+        }
         expectLastCall();
         replay(store, context);
 
-        new TupleForwarder<>(store, context, null, false)
-            .maybeForward("key", "value", "oldValue");
+        new TimestampedTupleForwarder<>(store, context, null, sendOldValues)
+            .maybeForward("key", "newValue", "oldValue");
 
         verify(store, context);
     }
@@ -71,10 +81,9 @@ public class TupleForwarderTest {
         expect(store.setFlushListener(null, false)).andReturn(true);
         replay(store, context);
 
-        new TupleForwarder<>(store, context, null, false)
-            .maybeForward("key", "value", "oldValue");
+        new TimestampedTupleForwarder<>(store, context, null, false)
+            .maybeForward("key", "newValue", "oldValue");
 
         verify(store, context);
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
index 68c6eaa..f62e826 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
@@ -49,16 +49,25 @@ public class TupleForwarderTest {
 
     @Test
     public void shouldForwardRecordsIfWrappedStateStoreDoesNotCache() {
+        shouldForwardRecordsIfWrappedStateStoreDoesNotCache(false);
+        shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true);
+    }
+
+    private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) {
         final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
         final ProcessorContext context = mock(ProcessorContext.class);
 
-        expect(store.setFlushListener(null, false)).andReturn(false);
-        context.forward("key", new Change<>("value", "oldValue"));
+        expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
+        if (sendOldValues) {
+            context.forward("key", new Change<>("newValue",  "oldValue"));
+        } else {
+            context.forward("key", new Change<>("newValue", null));
+        }
         expectLastCall();
         replay(store, context);
 
-        new TupleForwarder<>(store, context, null, false)
-            .maybeForward("key", "value", "oldValue");
+        new TupleForwarder<>(store, context, null, sendOldValues)
+            .maybeForward("key", "newValue", "oldValue");
 
         verify(store, context);
     }
@@ -72,7 +81,7 @@ public class TupleForwarderTest {
         replay(store, context);
 
         new TupleForwarder<>(store, context, null, false)
-            .maybeForward("key", "value", "oldValue");
+            .maybeForward("key", "newValue", "oldValue");
 
         verify(store, context);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index c0e0de3..a5db924 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
-import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
+import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer;
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -87,7 +87,7 @@ public class GlobalStreamThreadTest {
             );
 
         builder.addGlobalStore(
-            new KeyValueStoreMaterializer<>(materialized).materialize().withLoggingDisabled(),
+            new TimestampedKeyValueStoreMaterializer<>(materialized).materialize().withLoggingDisabled(),
             "sourceName",
             null,
             null,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 8c8811d..0e50120 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -51,13 +51,15 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.WindowKeySchema;
 import org.apache.kafka.test.MockKeyValueStore;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
 import org.apache.kafka.test.MockRestoreConsumer;
 import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.MockKeyValueStoreBuilder;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -352,9 +354,9 @@ public class StandbyTaskTest {
 
         assertEquals(
             asList(
-                new KeyValue<>(new Windowed<>(1, new TimeWindow(0, 60_000)), 100L),
-                new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), 100L),
-                new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), 100L)
+                new KeyValue<>(new Windowed<>(1, new TimeWindow(0, 60_000)), ValueAndTimestamp.make(100L, 60_000L)),
+                new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), ValueAndTimestamp.make(100L, 120_000L)),
+                new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), ValueAndTimestamp.make(100L, 180_000L))
             ),
             getWindowedStoreContents(storeName, task)
         );
@@ -368,9 +370,9 @@ public class StandbyTaskTest {
         // the first record's window should have expired.
         assertEquals(
             asList(
-                new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), 100L),
-                new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), 100L),
-                new KeyValue<>(new Windowed<>(4, new TimeWindow(180_000, 240_000)), 100L)
+                new KeyValue<>(new Windowed<>(2, new TimeWindow(60_000, 120_000)), ValueAndTimestamp.make(100L, 120_000L)),
+                new KeyValue<>(new Windowed<>(3, new TimeWindow(120_000, 180_000)), ValueAndTimestamp.make(100L, 180_000L)),
+                new KeyValue<>(new Windowed<>(4, new TimeWindow(180_000, 240_000)), ValueAndTimestamp.make(100L, 240_000L))
             ),
             getWindowedStoreContents(storeName, task)
         );
@@ -388,7 +390,7 @@ public class StandbyTaskTest {
             changelogName,
             1,
             offset,
-            start,
+            end,
             TimestampType.CREATE_TIME,
             0L,
             0,
@@ -449,17 +451,17 @@ public class StandbyTaskTest {
     }
 
     @SuppressWarnings("unchecked")
-    private List<KeyValue<Windowed<Integer>, Long>> getWindowedStoreContents(final String storeName,
-                                                                             final StandbyTask task) {
+    private List<KeyValue<Windowed<Integer>, ValueAndTimestamp<Long>>> getWindowedStoreContents(final String storeName,
+                                                                                                final StandbyTask task) {
         final StandbyContextImpl context = (StandbyContextImpl) task.context();
 
-        final List<KeyValue<Windowed<Integer>, Long>> result = new ArrayList<>();
+        final List<KeyValue<Windowed<Integer>, ValueAndTimestamp<Long>>> result = new ArrayList<>();
 
-        try (final KeyValueIterator<Windowed<byte[]>, Long> iterator =
-                 ((WindowStore) context.getStateMgr().getStore(storeName)).all()) {
+        try (final KeyValueIterator<Windowed<byte[]>, ValueAndTimestamp<Long>> iterator =
+                 ((TimestampedWindowStore) context.getStateMgr().getStore(storeName)).all()) {
 
             while (iterator.hasNext()) {
-                final KeyValue<Windowed<byte[]>, Long> next = iterator.next();
+                final KeyValue<Windowed<byte[]>, ValueAndTimestamp<Long>> next = iterator.next();
                 final Integer deserializedKey = new IntegerDeserializer().deserialize(null, next.key.key());
                 result.add(new KeyValue<>(new Windowed<>(deserializedKey, next.key.window()), next.value));
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java
similarity index 66%
rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java
index 30080c3..f963786 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java
@@ -20,16 +20,18 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
-import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.CachedStateStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.internals.CachingKeyValueStore;
 import org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
+import org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
-import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -44,7 +46,7 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.hamcrest.core.IsNot.not;
 
 @RunWith(EasyMockRunner.class)
-public class KeyValueStoreMaterializerTest {
+public class TimestampedKeyValueStoreMaterializerTest {
 
     private final String storePrefix = "prefix";
     @Mock(type = MockType.NICE)
@@ -55,14 +57,14 @@ public class KeyValueStoreMaterializerTest {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized =
             new MaterializedInternal<>(Materialized.as("store"), nameProvider, storePrefix);
 
-        final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
-        final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
-        final KeyValueStore<String, String> store = builder.build();
+        final TimestampedKeyValueStoreMaterializer<String, String> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized);
+        final StoreBuilder<TimestampedKeyValueStore<String, String>> builder = materializer.materialize();
+        final TimestampedKeyValueStore<String, String> store = builder.build();
         final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped();
         final StateStore logging = caching.wrapped();
-        assertThat(store, instanceOf(MeteredKeyValueStore.class));
-        assertThat(caching, instanceOf(CachedStateStore.class));
-        assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+        assertThat(store, instanceOf(MeteredTimestampedKeyValueStore.class));
+        assertThat(caching, instanceOf(CachingKeyValueStore.class));
+        assertThat(logging, instanceOf(ChangeLoggingTimestampedKeyValueBytesStore.class));
     }
 
     @Test
@@ -70,9 +72,9 @@ public class KeyValueStoreMaterializerTest {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
             Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withCachingDisabled(), nameProvider, storePrefix
         );
-        final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
-        final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
-        final KeyValueStore<String, String> store = builder.build();
+        final TimestampedKeyValueStoreMaterializer<String, String> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized);
+        final StoreBuilder<TimestampedKeyValueStore<String, String>> builder = materializer.materialize();
+        final TimestampedKeyValueStore<String, String> store = builder.build();
         final WrappedStateStore logging = (WrappedStateStore) ((WrappedStateStore) store).wrapped();
         assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
     }
@@ -82,11 +84,11 @@ public class KeyValueStoreMaterializerTest {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
             Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withLoggingDisabled(), nameProvider, storePrefix
         );
-        final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
-        final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
-        final KeyValueStore<String, String> store = builder.build();
+        final TimestampedKeyValueStoreMaterializer<String, String> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized);
+        final StoreBuilder<TimestampedKeyValueStore<String, String>> builder = materializer.materialize();
+        final TimestampedKeyValueStore<String, String> store = builder.build();
         final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped();
-        assertThat(caching, instanceOf(CachedStateStore.class));
+        assertThat(caching, instanceOf(CachingKeyValueStore.class));
         assertThat(caching.wrapped(), not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
     }
 
@@ -95,11 +97,11 @@ public class KeyValueStoreMaterializerTest {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(
             Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider, storePrefix
         );
-        final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
-        final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
-        final KeyValueStore<String, String> store = builder.build();
+        final TimestampedKeyValueStoreMaterializer<String, String> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized);
+        final StoreBuilder<TimestampedKeyValueStore<String, String>> builder = materializer.materialize();
+        final TimestampedKeyValueStore<String, String> store = builder.build();
         final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertThat(wrapped, not(instanceOf(CachedStateStore.class)));
+        assertThat(wrapped, not(instanceOf(CachingKeyValueStore.class)));
         assertThat(wrapped, not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
     }
 
@@ -113,9 +115,9 @@ public class KeyValueStoreMaterializerTest {
 
         final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>> materialized =
             new MaterializedInternal<>(Materialized.as(supplier), nameProvider, storePrefix);
-        final KeyValueStoreMaterializer<String, Integer> materializer = new KeyValueStoreMaterializer<>(materialized);
-        final StoreBuilder<KeyValueStore<String, Integer>> builder = materializer.materialize();
-        final KeyValueStore<String, Integer> built = builder.build();
+        final TimestampedKeyValueStoreMaterializer<String, Integer> materializer = new TimestampedKeyValueStoreMaterializer<>(materialized);
+        final StoreBuilder<TimestampedKeyValueStore<String, Integer>> builder = materializer.materialize();
+        final TimestampedKeyValueStore<String, Integer> built = builder.build();
 
         assertThat(store.name(), CoreMatchers.equalTo(built.name()));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index e520df4..8c83271 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -218,6 +218,17 @@ public class StoresTest {
     }
 
     @Test
+    public void shouldBuildTimestampedKeyValueStoreThatWrapsInMemoryKeyValueStore() {
+        final TimestampedKeyValueStore<String, String> store = Stores.timestampedKeyValueStoreBuilder(
+            Stores.inMemoryKeyValueStore("name"),
+            Serdes.String(),
+            Serdes.String()
+        ).withLoggingDisabled().withCachingDisabled().build();
+        assertThat(store, not(nullValue()));
+        assertThat(((WrappedStateStore) store).wrapped(), instanceOf(TimestampedBytesStore.class));
+    }
+
+    @Test
     public void shouldBuildWindowStore() {
         final WindowStore<String, String> store = Stores.windowStoreBuilder(
             Stores.persistentWindowStore("store", ofMillis(3L), ofMillis(3L), true),
@@ -238,6 +249,27 @@ public class StoresTest {
     }
 
     @Test
+    public void shouldBuildTimestampedWindowStoreThatWrapsWindowStore() {
+        final TimestampedWindowStore<String, String> store = Stores.timestampedWindowStoreBuilder(
+            Stores.persistentWindowStore("store", ofMillis(3L), ofMillis(3L), true),
+            Serdes.String(),
+            Serdes.String()
+        ).build();
+        assertThat(store, not(nullValue()));
+    }
+
+    @Test
+    public void shouldBuildTimestampedWindowStoreThatWrapsInMemroyWindowStore() {
+        final TimestampedWindowStore<String, String> store = Stores.timestampedWindowStoreBuilder(
+            Stores.inMemoryWindowStore("store", ofMillis(3L), ofMillis(3L), true),
+            Serdes.String(),
+            Serdes.String()
+        ).withLoggingDisabled().withCachingDisabled().build();
+        assertThat(store, not(nullValue()));
+        assertThat(((WrappedStateStore) store).wrapped(), instanceOf(TimestampedBytesStore.class));
+    }
+
+    @Test
     public void shouldBuildSessionStore() {
         final SessionStore<String, String> store = Stores.sessionStoreBuilder(
             Stores.persistentSessionStore("name", ofMillis(10)),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java
index 7b0eb6d..1ba3d37 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java
@@ -18,11 +18,9 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
@@ -46,14 +44,16 @@ public class TimestampedKeyValueStoreBuilderTest {
     @Mock(type = MockType.NICE)
     private KeyValueBytesStoreSupplier supplier;
     @Mock(type = MockType.NICE)
-    private KeyValueStore<Bytes, byte[]> inner;
+    private RocksDBTimestampedStore inner;
     private TimestampedKeyValueStoreBuilder<String, String> builder;
 
     @Before
     public void setUp() {
         expect(supplier.get()).andReturn(inner);
         expect(supplier.name()).andReturn("name");
-        replay(supplier);
+        expect(inner.persistent()).andReturn(true).anyTimes();
+        replay(supplier, inner);
+
         builder = new TimestampedKeyValueStoreBuilder<>(
             supplier,
             Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
index 7be31ea..e6d3da5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
@@ -18,12 +18,10 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
-import org.apache.kafka.streams.state.WindowStore;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
 import org.easymock.MockType;
@@ -46,14 +44,15 @@ public class TimestampedWindowStoreBuilderTest {
     @Mock(type = MockType.NICE)
     private WindowBytesStoreSupplier supplier;
     @Mock(type = MockType.NICE)
-    private WindowStore<Bytes, byte[]> inner;
+    private RocksDBTimestampedWindowStore inner;
     private TimestampedWindowStoreBuilder<String, String> builder;
 
     @Before
     public void setUp() {
         expect(supplier.get()).andReturn(inner);
         expect(supplier.name()).andReturn("name");
-        replay(supplier);
+        expect(inner.persistent()).andReturn(true).anyTimes();
+        replay(supplier, inner);
 
         builder = new TimestampedWindowStoreBuilder<>(
             supplier,
diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
new file mode 100644
index 0000000..67a67c9
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
+import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * This class is a generic version of the in-memory key-value store that is useful for testing when you
+ *  need a basic KeyValueStore for arbitrary types and don't have/want to write a serde
+ */
+public class GenericInMemoryTimestampedKeyValueStore<K extends Comparable, V>
+    extends WrappedStateStore<StateStore, K, ValueAndTimestamp<V>>
+    implements TimestampedKeyValueStore<K, V> {
+
+    private final String name;
+    private final NavigableMap<K, ValueAndTimestamp<V>> map;
+    private volatile boolean open = false;
+
+    public GenericInMemoryTimestampedKeyValueStore(final String name) {
+        // it's not really a `WrappedStateStore` so we pass `null`
+        // however, we need to implement `WrappedStateStore` to make the store usable
+        super(null);
+        this.name = name;
+
+        this.map = new TreeMap<>();
+    }
+
+    @Override
+    public String name() {
+        return this.name;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    /* This is a "dummy" store used for testing;
+       it does not support restoring from changelog since we allow it to be serde-ignorant */
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (root != null) {
+            context.register(root, null);
+        }
+
+        this.open = true;
+    }
+
+    @Override
+    public boolean setFlushListener(final CacheFlushListener<K, ValueAndTimestamp<V>> listener,
+                                    final boolean sendOldValues) {
+        return false;
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return this.open;
+    }
+
+    @Override
+    public synchronized ValueAndTimestamp<V> get(final K key) {
+        return this.map.get(key);
+    }
+
+    @Override
+    public synchronized void put(final K key,
+                                 final ValueAndTimestamp<V> value) {
+        if (value == null) {
+            this.map.remove(key);
+        } else {
+            this.map.put(key, value);
+        }
+    }
+
+    @Override
+    public synchronized ValueAndTimestamp<V> putIfAbsent(final K key,
+                                                         final ValueAndTimestamp<V> value) {
+        final ValueAndTimestamp<V> originalValue = get(key);
+        if (originalValue == null) {
+            put(key, value);
+        }
+        return originalValue;
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<K, ValueAndTimestamp<V>>> entries) {
+        for (final KeyValue<K, ValueAndTimestamp<V>> entry : entries) {
+            put(entry.key, entry.value);
+        }
+    }
+
+    @Override
+    public synchronized ValueAndTimestamp<V> delete(final K key) {
+        return this.map.remove(key);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<K, ValueAndTimestamp<V>> range(final K from,
+        final K to) {
+        return new DelegatingPeekingKeyValueIterator<>(
+            name,
+            new GenericInMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator()));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<K, ValueAndTimestamp<V>> all() {
+        final TreeMap<K, ValueAndTimestamp<V>> copy = new TreeMap<>(this.map);
+        return new DelegatingPeekingKeyValueIterator<>(name, new GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return this.map.size();
+    }
+
+    @Override
+    public void flush() {
+        // do-nothing since it is in-memory
+    }
+
+    @Override
+    public void close() {
+        this.map.clear();
+        this.open = false;
+    }
+
+    private static class GenericInMemoryKeyValueIterator<K, V> implements KeyValueIterator<K, ValueAndTimestamp<V>> {
+        private final Iterator<Entry<K, ValueAndTimestamp<V>>> iter;
+
+        private GenericInMemoryKeyValueIterator(final Iterator<Map.Entry<K, ValueAndTimestamp<V>>> iter) {
+            this.iter = iter;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, ValueAndTimestamp<V>> next() {
+            final Map.Entry<K, ValueAndTimestamp<V>> entry = iter.next();
+            return new KeyValue<>(entry.getKey(), entry.getValue());
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+
+        @Override
+        public K peekNextKey() {
+            throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
+        }
+    }
+}
\ No newline at end of file