You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/07 02:33:45 UTC

[kafka] 01/05: KAFKA-10562: Delegate store wrappers to new init method

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kip-478-part-5-state-store-wrappers
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 59e3e34b44a68627718a7f76ae39b08906e97ef5
Author: John Roesler <vv...@apache.org>
AuthorDate: Thu Oct 1 10:19:59 2020 -0500

    KAFKA-10562: Delegate store wrappers to new init method
---
 .../processor/internals/ProcessorContextUtils.java |  7 +++
 .../state/internals/CachingKeyValueStore.java      | 29 ++++++++++--
 .../state/internals/CachingSessionStore.java       | 17 +++++++
 .../state/internals/CachingWindowStore.java        | 12 +++++
 .../internals/ChangeLoggingKeyValueBytesStore.java | 26 +++++++++++
 .../internals/ChangeLoggingSessionBytesStore.java  | 17 +++++++
 .../internals/ChangeLoggingWindowBytesStore.java   | 13 ++++++
 ...ValueToTimestampedKeyValueByteStoreAdapter.java |  6 +++
 .../state/internals/MeteredKeyValueStore.java      | 52 +++++++++++++++++----
 .../state/internals/MeteredSessionStore.java       | 53 ++++++++++++++++++----
 .../internals/MeteredTimestampedKeyValueStore.java | 13 ++++++
 .../internals/MeteredTimestampedWindowStore.java   | 15 ++++++
 .../state/internals/MeteredWindowStore.java        | 50 ++++++++++++++++----
 .../state/internals/RocksDBWindowStore.java        | 14 ++++--
 .../internals/TimestampedKeyValueStoreBuilder.java |  6 +++
 .../internals/TimestampedWindowStoreBuilder.java   |  6 +++
 .../WindowToTimestampedWindowByteStoreAdapter.java |  6 +++
 .../streams/state/internals/WrappedStateStore.java |  6 +++
 .../ChangeLoggingSessionBytesStoreTest.java        |  3 +-
 ...angeLoggingTimestampedWindowBytesStoreTest.java |  3 +-
 .../ChangeLoggingWindowBytesStoreTest.java         |  3 +-
 .../streams/internals/KeyValueStoreFacade.java     |  6 +++
 .../kafka/streams/internals/WindowStoreFacade.java |  6 +++
 23 files changed, 332 insertions(+), 37 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
index 78b1ff5..41a1197 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
 /**
@@ -52,4 +53,10 @@ public final class ProcessorContextUtils {
             ? ((InternalProcessorContext) context).changelogFor(storeName)
             : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
     }
+
+    public static String changelogFor(final StateStoreContext context, final String storeName) {
+        return context instanceof InternalProcessorContext
+            ? ((InternalProcessorContext) context).changelogFor(storeName)
+            : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index cae38e0..ba2f949 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -57,20 +58,40 @@ public class CachingKeyValueStore
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        initInternal(context);
+        if (!(context instanceof InternalProcessorContext)) {
+            throw new IllegalArgumentException(
+                "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
+            );
+        }
+        initInternal((InternalProcessorContext) context);
+        super.init(context, root);
+        // save the stream thread as we only ever want to trigger a flush
+        // when the stream thread is the current thread.
+        streamThread = Thread.currentThread();
+    }
+
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        if (!(context instanceof InternalProcessorContext)) {
+            throw new IllegalArgumentException(
+                "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
+            );
+        }
+        initInternal((InternalProcessorContext) context);
         super.init(context, root);
         // save the stream thread as we only ever want to trigger a flush
         // when the stream thread is the current thread.
         streamThread = Thread.currentThread();
     }
 
-    private void initInternal(final ProcessorContext context) {
-        this.context = (InternalProcessorContext) context;
+    private void initInternal(final InternalProcessorContext context) {
+        this.context = context;
 
         this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), name());
         this.context.registerCacheFlushListener(cacheName, entries -> {
             for (final ThreadCache.DirtyEntry entry : entries) {
-                putAndMaybeForward(entry, (InternalProcessorContext) context);
+                putAndMaybeForward(entry, context);
             }
         });
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 4ac43a2..616bb00 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordQueue;
@@ -63,6 +64,22 @@ class CachingSessionStore
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
+        if (!(context instanceof InternalProcessorContext)) {
+            throw new IllegalArgumentException(
+                "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
+            );
+        }
+        initInternal((InternalProcessorContext) context);
+        super.init(context, root);
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        if (!(context instanceof InternalProcessorContext)) {
+            throw new IllegalArgumentException(
+                "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
+            );
+        }
         initInternal((InternalProcessorContext) context);
         super.init(context, root);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 20be3a3..936f09c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -78,6 +79,17 @@ class CachingWindowStore
         super.init(context, root);
     }
 
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        if (!(context instanceof InternalProcessorContext)) {
+            throw new IllegalArgumentException(
+                "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
+            );
+        }
+        initInternal((InternalProcessorContext) context);
+        super.init(context, root);
+    }
+
     private void initInternal(final InternalProcessorContext context) {
         this.context = context;
         final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 236f218..d5205ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -40,6 +41,31 @@ public class ChangeLoggingKeyValueBytesStore
     public void init(final ProcessorContext context,
                      final StateStore root) {
         super.init(context, root);
+        if (!(context instanceof InternalProcessorContext)) {
+            throw new IllegalArgumentException(
+                "Change logging requires internal features of KafkaStreams and must be disabled for unit tests."
+            );
+        }
+        this.context = (InternalProcessorContext) context;
+
+        // if the inner store is an LRU cache, add the eviction listener to log removed record
+        if (wrapped() instanceof MemoryLRUCache) {
+            ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
+                // pass null to indicate removal
+                log(key, null);
+            });
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        super.init(context, root);
+        if (!(context instanceof InternalProcessorContext)) {
+            throw new IllegalArgumentException(
+                "Change logging requires internal features of KafkaStreams and must be disabled for unit tests."
+            );
+        }
         this.context = (InternalProcessorContext) context;
 
         // if the inner store is an LRU cache, add the eviction listener to log removed record
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index cc586d3..648a47e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
@@ -41,6 +42,22 @@ class ChangeLoggingSessionBytesStore
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         super.init(context, root);
+        if (!(context instanceof InternalProcessorContext)) {
+            throw new IllegalArgumentException(
+                "Change logging requires internal features of KafkaStreams and must be disabled for unit tests."
+            );
+        }
+        this.context = (InternalProcessorContext) context;
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        super.init(context, root);
+        if (!(context instanceof InternalProcessorContext)) {
+            throw new IllegalArgumentException(
+                "Change logging requires internal features of KafkaStreams and must be disabled for unit tests."
+            );
+        }
         this.context = (InternalProcessorContext) context;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 8da413c..6eee6c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
@@ -56,6 +57,18 @@ class ChangeLoggingWindowBytesStore
     }
 
     @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        if (!(context instanceof InternalProcessorContext)) {
+            throw new IllegalArgumentException(
+                "Change logging requires internal features of KafkaStreams and must be disabled for unit tests."
+            );
+        }
+        this.context = (InternalProcessorContext) context;
+        super.init(context, root);
+    }
+
+    @Override
     public byte[] fetch(final Bytes key,
                         final long timestamp) {
         return wrapped().fetch(key, timestamp);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
index fa29974..bb2ef26 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -88,6 +89,11 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
     }
 
     @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        store.init(context, root);
+    }
+
+    @Override
     public void flush() {
         store.flush();
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 31e2eff..8ca7686 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -65,7 +67,7 @@ public class MeteredKeyValueStore<K, V>
     private Sensor rangeSensor;
     private Sensor flushSensor;
     private Sensor e2eLatencySensor;
-    private ProcessorContext context;
+    private InternalProcessorContext context;
     private StreamsMetricsImpl streamsMetrics;
     private final String threadId;
     private String taskId;
@@ -86,11 +88,36 @@ public class MeteredKeyValueStore<K, V>
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
         taskId = context.taskId().toString();
         initStoreSerde(context);
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+        registerMetrics();
+        final Sensor restoreSensor =
+            StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+
+        // register and possibly restore the state from the logs
+        maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
+    }
+
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
+        taskId = context.taskId().toString();
+        initStoreSerde(context);
+        streamsMetrics = (StreamsMetricsImpl) context.metrics();
+
+        registerMetrics();
+        final Sensor restoreSensor =
+            StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+
+        // register and possibly restore the state from the logs
+        maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
+    }
+
+    private void registerMetrics() {
         putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
         putIfAbsentSensor = StateStoreMetrics.putIfAbsentSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
         putAllSensor = StateStoreMetrics.putAllSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
@@ -100,11 +127,6 @@ public class MeteredKeyValueStore<K, V>
         flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
         deleteSensor = StateStoreMetrics.deleteSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
         e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
-        final Sensor restoreSensor =
-            StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
-
-        // register and possibly restore the state from the logs
-        maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
     }
 
     @SuppressWarnings("unchecked")
@@ -112,6 +134,18 @@ public class MeteredKeyValueStore<K, V>
         final String storeName = name();
         final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
         serdes = new StateSerdes<>(
+            changelogTopic != null ?
+                changelogTopic :
+                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+    }
+
+    @SuppressWarnings("unchecked")
+    void initStoreSerde(final StateStoreContext context) {
+        final String storeName = name();
+        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
+        serdes = new StateSerdes<>(
              changelogTopic != null ?
                 changelogTopic :
                 ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
@@ -250,7 +284,9 @@ public class MeteredKeyValueStore<K, V>
     }
 
     private void maybeRecordE2ELatency() {
-        if (e2eLatencySensor.shouldRecord()) {
+        // Context is null if the provided context isn't an implementation of InternalProcessorContext.
+        // In that case, we _can't_ get the current timestamp, so we don't record anything.
+        if (e2eLatencySensor.shouldRecord() && context != null) {
             final long currentTime = time.milliseconds();
             final long e2eLatency =  currentTime - context.timestamp();
             e2eLatencySensor.record(e2eLatency, currentTime);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index d8ce02a..96fed94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -51,7 +53,7 @@ public class MeteredSessionStore<K, V>
     private Sensor flushSensor;
     private Sensor removeSensor;
     private Sensor e2eLatencySensor;
-    private ProcessorContext context;
+    private InternalProcessorContext context;
     private final String threadId;
     private String taskId;
 
@@ -71,16 +73,12 @@ public class MeteredSessionStore<K, V>
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
         initStoreSerde(context);
         taskId = context.taskId().toString();
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
-        putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
-        fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
-        flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
-        removeSensor = StateStoreMetrics.removeSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
-        e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
+        registerMetrics();
         final Sensor restoreSensor =
             StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
 
@@ -88,6 +86,30 @@ public class MeteredSessionStore<K, V>
         maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
     }
 
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
+        initStoreSerde(context);
+        taskId = context.taskId().toString();
+        streamsMetrics = (StreamsMetricsImpl) context.metrics();
+
+        registerMetrics();
+        final Sensor restoreSensor =
+            StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+
+        // register and possibly restore the state from the logs
+        maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
+    }
+
+    private void registerMetrics() {
+        putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+        fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+        flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+        removeSensor = StateStoreMetrics.removeSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+        e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
+    }
+
     @SuppressWarnings("unchecked")
     private void initStoreSerde(final ProcessorContext context) {
         final String storeName = name();
@@ -102,6 +124,19 @@ public class MeteredSessionStore<K, V>
     }
 
     @SuppressWarnings("unchecked")
+    private void initStoreSerde(final StateStoreContext context) {
+        final String storeName = name();
+        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
+        serdes = new StateSerdes<>(
+            changelogTopic != null ?
+                changelogTopic :
+                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde
+        );
+    }
+
+    @SuppressWarnings("unchecked")
     @Override
     public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener,
                                     final boolean sendOldValues) {
@@ -255,7 +290,9 @@ public class MeteredSessionStore<K, V>
     }
 
     private void maybeRecordE2ELatency() {
-        if (e2eLatencySensor.shouldRecord()) {
+        // Context is null if the provided context isn't an implementation of InternalProcessorContext.
+        // In that case, we _can't_ get the current timestamp, so we don't record anything.
+        if (e2eLatencySensor.shouldRecord() && context != null) {
             final long currentTime = time.milliseconds();
             final long e2eLatency =  currentTime - context.timestamp();
             e2eLatencySensor.record(e2eLatency, currentTime);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 022c1f8..e4e6d08 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -62,6 +63,18 @@ public class MeteredTimestampedKeyValueStore<K, V>
             valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
     }
 
+    @SuppressWarnings("unchecked")
+    void initStoreSerde(final StateStoreContext context) {
+        final String storeName = name();
+        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
+        serdes = new StateSerdes<>(
+            changelogTopic != null ?
+                changelogTopic :
+                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
+    }
+
     public RawAndDeserializedValue<V> getWithBinary(final K key) {
         try {
             return maybeMeasureLatency(() -> { 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
index 3c386f3..0d83aba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -61,4 +62,18 @@ class MeteredTimestampedWindowStore<K, V>
             valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde
         );
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    void initStoreSerde(final StateStoreContext context) {
+        final String storeName = name();
+        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
+        serdes = new StateSerdes<>(
+            changelogTopic != null ?
+                changelogTopic :
+                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde
+        );
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 85c3ec2..c4cd1f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -50,7 +52,7 @@ public class MeteredWindowStore<K, V>
     private Sensor fetchSensor;
     private Sensor flushSensor;
     private Sensor e2eLatencySensor;
-    private ProcessorContext context;
+    private InternalProcessorContext context;
     private final String threadId;
     private String taskId;
 
@@ -72,15 +74,28 @@ public class MeteredWindowStore<K, V>
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
         initStoreSerde(context);
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
         taskId = context.taskId().toString();
 
-        putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
-        fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
-        flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
-        e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
+        registerMetrics();
+        final Sensor restoreSensor =
+            StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+
+        // register and possibly restore the state from the logs
+        maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
+    }
+
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
+        initStoreSerde(context);
+        streamsMetrics = (StreamsMetricsImpl) context.metrics();
+        taskId = context.taskId().toString();
+
+        registerMetrics();
         final Sensor restoreSensor =
             StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
 
@@ -88,6 +103,13 @@ public class MeteredWindowStore<K, V>
         maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
     }
 
+    private void registerMetrics() {
+        putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+        fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+        flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+        e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
+    }
+
     @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {
         final String storeName = name();
@@ -101,6 +123,18 @@ public class MeteredWindowStore<K, V>
     }
 
     @SuppressWarnings("unchecked")
+    void initStoreSerde(final StateStoreContext context) {
+        final String storeName = name();
+        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
+        serdes = new StateSerdes<>(
+            changelogTopic != null ?
+                changelogTopic :
+                ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+    }
+
+    @SuppressWarnings("unchecked")
     @Override
     public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener,
                                     final boolean sendOldValues) {
@@ -122,7 +156,7 @@ public class MeteredWindowStore<K, V>
     @Override
     public void put(final K key,
                     final V value) {
-        put(key, value, context.timestamp());
+        put(key, value, context != null ? context.timestamp() : 0L);
     }
 
     @Override
@@ -264,7 +298,7 @@ public class MeteredWindowStore<K, V>
     }
 
     private void maybeRecordE2ELatency() {
-        if (e2eLatencySensor.shouldRecord()) {
+        if (e2eLatencySensor.shouldRecord() && context != null) {
             final long currentTime = time.milliseconds();
             final long e2eLatency =  currentTime - context.timestamp();
             e2eLatencySensor.record(e2eLatency, currentTime);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 53f0615..7c779a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -31,7 +33,7 @@ public class RocksDBWindowStore
     private final boolean retainDuplicates;
     private final long windowSize;
 
-    private ProcessorContext context;
+    private InternalProcessorContext context;
     private int seqnum = 0;
 
     RocksDBWindowStore(final SegmentedBytesStore bytesStore,
@@ -44,14 +46,20 @@ public class RocksDBWindowStore
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
+        super.init(context, root);
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
         super.init(context, root);
     }
 
     @Deprecated
     @Override
     public void put(final Bytes key, final byte[] value) {
-        put(key, value, context.timestamp());
+        put(key, value, context != null ? context.timestamp() : 0L);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index be8f259..522472d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -101,6 +102,11 @@ public class TimestampedKeyValueStoreBuilder<K, V>
         }
 
         @Override
+        public void init(final StateStoreContext context, final StateStore root) {
+            wrapped.init(context, root);
+        }
+
+        @Override
         public void put(final Bytes key,
                         final byte[] value) {
             wrapped.put(key, value);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index a544262..454b123 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -114,6 +115,11 @@ public class TimestampedWindowStoreBuilder<K, V>
             wrapped.init(context, root);
         }
 
+        @Override
+        public void init(final StateStoreContext context, final StateStore root) {
+            wrapped.init(context, root);
+        }
+
         @Deprecated
         @Override
         public void put(final Bytes key,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index da32599..bfeb33e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -167,6 +168,11 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
     }
 
     @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        store.init(context, root);
+    }
+
+    @Override
     public void flush() {
         store.flush();
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index 208bc06..c4b4b53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 
 /**
@@ -48,6 +49,11 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S
         wrapped.init(context, root);
     }
 
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        wrapped.init(context, root);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public boolean setFlushListener(final CacheFlushListener<K, V> listener,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 426a334..8753342 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
@@ -57,7 +56,7 @@ public class ChangeLoggingSessionBytesStoreTest {
     private void init() {
         EasyMock.expect(context.taskId()).andReturn(taskId);
         EasyMock.expect(context.recordCollector()).andReturn(collector);
-        inner.init((ProcessorContext) context, store);
+        inner.init((StateStoreContext) context, store);
         EasyMock.expectLastCall();
         EasyMock.replay(inner, context);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index 9de2207..e928678 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
@@ -59,7 +58,7 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
     private void init() {
         EasyMock.expect(context.taskId()).andReturn(taskId);
         EasyMock.expect(context.recordCollector()).andReturn(collector);
-        inner.init((ProcessorContext) context, store);
+        inner.init((StateStoreContext) context, store);
         EasyMock.expectLastCall();
         EasyMock.replay(inner, context);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index c877ac6..82835bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
@@ -57,7 +56,7 @@ public class ChangeLoggingWindowBytesStoreTest {
     private void init() {
         EasyMock.expect(context.taskId()).andReturn(taskId);
         EasyMock.expect(context.recordCollector()).andReturn(collector);
-        inner.init((ProcessorContext) context, store);
+        inner.init((StateStoreContext) context, store);
         EasyMock.expectLastCall();
         EasyMock.replay(inner, context);
 
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
index 749c41f..bfb8433 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -42,6 +43,11 @@ public class KeyValueStoreFacade<K, V> extends ReadOnlyKeyValueStoreFacade<K, V>
     }
 
     @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        inner.init(context, root);
+    }
+
+    @Override
     public void put(final K key,
                     final V value) {
         inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
index 6945059..6186c2a 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -41,6 +42,11 @@ public class WindowStoreFacade<K, V> extends ReadOnlyWindowStoreFacade<K, V> imp
         inner.init(context, root);
     }
 
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        inner.init(context, root);
+    }
+
     @Deprecated
     @Override
     public void put(final K key,