You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/07 03:18:33 UTC

[kafka] branch kip-478-part-5-state-store-wrappers updated (ccc12cd -> 4452d1b)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a change to branch kip-478-part-5-state-store-wrappers
in repository https://gitbox.apache.org/repos/asf/kafka.git.


 discard ccc12cd  done
     new 4452d1b  done

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (ccc12cd)
            \
             N -- N -- N   refs/heads/kip-478-part-5-state-store-wrappers (4452d1b)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)


[kafka] 01/01: done

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kip-478-part-5-state-store-wrappers
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4452d1b9337249d2cc6c801a9e08675425cdcbcf
Author: John Roesler <vv...@apache.org>
AuthorDate: Tue Oct 6 21:33:59 2020 -0500

    done
---
 .../examples/wordcount/WordCountProcessorTest.java |  1 +
 .../wordcount/WordCountTransformerTest.java        |  1 +
 .../internals/AbstractReadOnlyDecorator.java       |  6 ++++
 .../internals/AbstractReadWriteDecorator.java      |  9 ++++-
 .../internals/ChangeLoggingKeyValueBytesStore.java |  3 +-
 .../streams/state/internals/KeyValueSegment.java   |  1 -
 .../streams/state/internals/RocksDBStore.java      |  3 --
 .../state/internals/TimestampedSegment.java        |  1 -
 .../internals/GlobalProcessorContextImplTest.java  |  1 -
 .../state/internals/AbstractKeyValueStoreTest.java |  1 -
 .../state/internals/CachingKeyValueStoreTest.java  | 25 +++++++++++++
 .../state/internals/CachingSessionStoreTest.java   | 28 ++++++++++++++-
 .../state/internals/CachingWindowStoreTest.java    | 27 +++++++++++++-
 .../ChangeLoggingKeyValueBytesStoreTest.java       | 42 +++++++++++++++++++---
 ...geLoggingTimestampedKeyValueBytesStoreTest.java | 42 +++++++++++++++++++---
 ...angeLoggingTimestampedWindowBytesStoreTest.java | 20 +++++++++++
 .../ChangeLoggingWindowBytesStoreTest.java         | 20 +++++++++++
 .../internals/InMemoryKeyValueLoggedStoreTest.java |  1 -
 .../state/internals/InMemoryKeyValueStoreTest.java |  1 -
 .../state/internals/InMemoryLRUCacheStoreTest.java |  1 -
 .../state/internals/MeteredKeyValueStoreTest.java  | 38 ++++++++++++++++++++
 .../state/internals/MeteredSessionStoreTest.java   | 38 ++++++++++++++++++++
 .../MeteredTimestampedKeyValueStoreTest.java       | 38 ++++++++++++++++++++
 .../MeteredTimestampedWindowStoreTest.java         | 41 +++++++++++++++++++++
 .../state/internals/MeteredWindowStoreTest.java    | 40 +++++++++++++++++++++
 .../state/internals/RocksDBKeyValueStoreTest.java  |  1 -
 .../kafka/streams/MockProcessorContextTest.java    |  3 +-
 .../streams/internals/KeyValueStoreFacadeTest.java |  1 +
 .../streams/internals/WindowStoreFacadeTest.java   |  1 +
 .../wordcount/WindowedWordCountProcessorTest.java  |  4 +++
 30 files changed, 414 insertions(+), 25 deletions(-)

diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
index bec77e6..ba52990 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
@@ -34,6 +34,7 @@ import static org.junit.Assert.assertTrue;
  * Demonstrate the use of {@link MockProcessorContext} for testing the {@link Processor} in the {@link WordCountProcessorDemo}.
  */
 public class WordCountProcessorTest {
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void test() {
         final MockProcessorContext context = new MockProcessorContext();
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
index 98d5012..6bb8c6c 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
@@ -33,6 +33,7 @@ import static org.junit.Assert.assertTrue;
  * Demonstrate the use of {@link MockProcessorContext} for testing the {@link Transformer} in the {@link WordCountTransformerDemo}.
  */
 public class WordCountTransformerTest {
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void test() {
         final MockProcessorContext context = new MockProcessorContext();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
index 20eb2c0..4424cdb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
@@ -54,6 +54,12 @@ abstract class AbstractReadOnlyDecorator<T extends StateStore, K, V> extends Wra
     }
 
     @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
     public void close() {
         throw new UnsupportedOperationException(ERROR_MESSAGE);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 0bfe452..d077089 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -32,6 +31,8 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
+import java.util.List;
+
 abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> {
     static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";
 
@@ -47,6 +48,12 @@ abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends Wr
     }
 
     @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
     public void close() {
         throw new UnsupportedOperationException(ERROR_MESSAGE);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 055a0f4..d6526a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -53,7 +53,8 @@ public class ChangeLoggingKeyValueBytesStore
                      final StateStore root) {
         super.init(context, root);
         this.context = asInternalProcessorContext(context);
-        maybeSetEvictionListener(); }
+        maybeSetEvictionListener();
+    }
 
     private void maybeSetEvictionListener() {
         // if the inner store is an LRU cache, add the eviction listener to log removed record
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
index 55a73c2..b6d6504 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
 import java.io.File;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index f72b3af..3241d07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -27,8 +27,6 @@ import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
-import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -67,7 +65,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.function.Supplier;
 
 import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
 import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.getMetricsImpl;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
index 59eb469..36fe0e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
 import java.io.File;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 1ae6630..eba6f06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.To;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 4ab15d8..3284b17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index c6fe483..89e2b0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -100,6 +100,31 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         return store;
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final KeyValueStore<Bytes, byte[]> inner = EasyMock.mock(InMemoryKeyValueStore.class);
+        final CachingKeyValueStore outer = new CachingKeyValueStore(inner);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((ProcessorContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final KeyValueStore<Bytes, byte[]> inner = EasyMock.mock(InMemoryKeyValueStore.class);
+        final CachingKeyValueStore outer = new CachingKeyValueStore(inner);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((StateStoreContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
     @Test
     public void shouldSetFlushListener() {
         assertTrue(store.setFlushListener(null, true));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index f36629d..05e97a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -89,7 +90,7 @@ public class CachingSessionStoreTest {
     public void before() {
         cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
@@ -99,6 +100,31 @@ public class CachingSessionStoreTest {
         cachingStore.close();
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final SessionStore<Bytes, byte[]> inner = EasyMock.mock(InMemorySessionStore.class);
+        final CachingSessionStore outer = new CachingSessionStore(inner, SEGMENT_INTERVAL);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((ProcessorContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final SessionStore<Bytes, byte[]> inner = EasyMock.mock(InMemorySessionStore.class);
+        final CachingSessionStore outer = new CachingSessionStore(inner, SEGMENT_INTERVAL);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((StateStoreContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
     @Test
     public void shouldPutFetchFromCache() {
         cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 42b750b..2a04c48 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -42,7 +43,6 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
@@ -113,6 +113,31 @@ public class CachingWindowStoreTest {
         cachingStore.close();
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final WindowStore<Bytes, byte[]> inner = EasyMock.mock(WindowStore.class);
+        final CachingWindowStore outer = new CachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((ProcessorContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final WindowStore<Bytes, byte[]> inner = EasyMock.mock(WindowStore.class);
+        final CachingWindowStore outer = new CachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((StateStoreContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
     @Test
     public void shouldNotReturnDuplicatesInRanges() {
         final StreamsBuilder builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 9106580..c3808b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -21,11 +21,15 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockRecordCollector;
 import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,14 +53,19 @@ public class ChangeLoggingKeyValueBytesStoreTest {
 
     @Before
     public void before() {
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(
+        final InternalMockProcessorContext context = mockContext();
+        context.setTime(0);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private InternalMockProcessorContext mockContext() {
+        return new InternalMockProcessorContext(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
             collector,
-            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
-        context.setTime(0);
-        store.init((StateStoreContext) context, store);
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
     }
 
     @After
@@ -64,6 +73,31 @@ public class ChangeLoggingKeyValueBytesStoreTest {
         store.close();
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final InternalMockProcessorContext context = mockContext();
+        final KeyValueStore<Bytes, byte[]> innerMock = EasyMock.mock(InMemoryKeyValueStore.class);
+        final StateStore outer = new ChangeLoggingKeyValueBytesStore(innerMock);
+        innerMock.init((ProcessorContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(innerMock);
+        outer.init((ProcessorContext) context, outer);
+        EasyMock.verify(innerMock);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final InternalMockProcessorContext context = mockContext();
+        final KeyValueStore<Bytes, byte[]> innerMock = EasyMock.mock(InMemoryKeyValueStore.class);
+        final StateStore outer = new ChangeLoggingKeyValueBytesStore(innerMock);
+        innerMock.init((StateStoreContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(innerMock);
+        outer.init((StateStoreContext) context, outer);
+        EasyMock.verify(innerMock);
+    }
+
     @Test
     public void shouldWriteKeyValueBytesToInnerStoreOnPut() {
         store.put(hi, there);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
index e05b171..8295f7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
@@ -21,12 +21,16 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockRecordCollector;
 import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -54,14 +58,19 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
 
     @Before
     public void before() {
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(
+        final InternalMockProcessorContext context = mockContext();
+        context.setTime(0);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private InternalMockProcessorContext mockContext() {
+        return new InternalMockProcessorContext(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
             collector,
-            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
-        context.setTime(0);
-        store.init((StateStoreContext) context, store);
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
     }
 
     @After
@@ -69,6 +78,31 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
         store.close();
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final InternalMockProcessorContext context = mockContext();
+        final KeyValueStore<Bytes, byte[]> inner = EasyMock.mock(InMemoryKeyValueStore.class);
+        final StateStore outer = new ChangeLoggingTimestampedKeyValueBytesStore(inner);
+        inner.init((ProcessorContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((ProcessorContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final InternalMockProcessorContext context = mockContext();
+        final KeyValueStore<Bytes, byte[]> inner = EasyMock.mock(InMemoryKeyValueStore.class);
+        final StateStore outer = new ChangeLoggingTimestampedKeyValueBytesStore(inner);
+        inner.init((StateStoreContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((StateStoreContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
     @Test
     public void shouldWriteKeyValueBytesToInnerStoreOnPut() {
         store.put(hi, rawThere);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index e928678..6608739 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
@@ -65,6 +66,25 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
         store.init((StateStoreContext) context, store);
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        inner.init((ProcessorContext) context, store);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        store.init((ProcessorContext) context, store);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        inner.init((StateStoreContext) context, store);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        store.init((StateStoreContext) context, store);
+        EasyMock.verify(inner);
+    }
+
     @Test
     @SuppressWarnings("deprecation")
     public void shouldLogPuts() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index 82835bc..c2c31e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
@@ -63,6 +64,25 @@ public class ChangeLoggingWindowBytesStoreTest {
         store.init((StateStoreContext) context, store);
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        inner.init((ProcessorContext) context, store);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        store.init((ProcessorContext) context, store);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        inner.init((StateStoreContext) context, store);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        store.init((StateStoreContext) context, store);
+        EasyMock.verify(inner);
+    }
+
     @Test
     @SuppressWarnings("deprecation")
     public void shouldLogPuts() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index 6f4104e..f54fca1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index b6c46e2..9730936 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index ae791cb..a044eda 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 83fffef..68faa15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -158,6 +159,43 @@ public class MeteredKeyValueStoreTest {
         metered.init((StateStoreContext) context, metered);
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final KeyValueStore<Bytes, byte[]> inner = mock(KeyValueStore.class);
+        final MeteredKeyValueStore<String, String> outer = new MeteredKeyValueStore<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            Serdes.String()
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        expectLastCall();
+        replay(inner, context);
+        outer.init((ProcessorContext) context, outer);
+        verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final KeyValueStore<Bytes, byte[]> inner = mock(KeyValueStore.class);
+        final MeteredKeyValueStore<String, String> outer = new MeteredKeyValueStore<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            Serdes.String()
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        expectLastCall();
+        replay(inner, context);
+        outer.init((StateStoreContext) context, outer);
+        verify(inner);
+    }
+
     @Test
     public void shouldPassChangelogTopicNameToStateStoreSerde() {
         doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 0ff822e..a77dd07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -162,6 +163,43 @@ public class MeteredSessionStoreTest {
         store.init((StateStoreContext) context, store);
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final SessionStore<Bytes, byte[]> inner = mock(SessionStore.class);
+        final MeteredSessionStore<String, String> outer = new MeteredSessionStore<>(
+            inner,
+            STORE_TYPE,
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime()
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        expectLastCall();
+        replay(inner, context);
+        outer.init((ProcessorContext) context, outer);
+        verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final SessionStore<Bytes, byte[]> inner = mock(SessionStore.class);
+        final MeteredSessionStore<String, String> outer = new MeteredSessionStore<>(
+            inner,
+            STORE_TYPE,
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime()
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        expectLastCall();
+        replay(inner, context);
+        outer.init((StateStoreContext) context, outer);
+        verify(inner);
+    }
+
     @Test
     public void shouldPassChangelogTopicNameToStateStoreSerde() {
         doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 3d28266..405ab4f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -162,6 +163,43 @@ public class MeteredTimestampedKeyValueStoreTest {
         metered.init((StateStoreContext) context, metered);
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final KeyValueStore<Bytes, byte[]> inner = mock(InMemoryKeyValueStore.class);
+        final MeteredTimestampedKeyValueStore<String, String> outer = new MeteredTimestampedKeyValueStore<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new ValueAndTimestampSerde<>(Serdes.String())
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        expectLastCall();
+        replay(inner, context);
+        outer.init((ProcessorContext) context, outer);
+        verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final KeyValueStore<Bytes, byte[]> inner = mock(InMemoryKeyValueStore.class);
+        final MeteredTimestampedKeyValueStore<String, String> outer = new MeteredTimestampedKeyValueStore<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new ValueAndTimestampSerde<>(Serdes.String())
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        expectLastCall();
+        replay(inner, context);
+        outer.init((StateStoreContext) context, outer);
+        verify(inner);
+    }
+
     @Test
     public void shouldPassChangelogTopicNameToStateStoreSerde() {
         doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index 9a9d763..315a1aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -42,6 +43,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.niceMock;
 import static org.easymock.EasyMock.replay;
@@ -95,6 +97,45 @@ public class MeteredTimestampedWindowStoreTest {
         );
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final WindowStore<Bytes, byte[]> inner = mock(WindowStore.class);
+        final MeteredTimestampedWindowStore<String, String> outer = new MeteredTimestampedWindowStore<>(
+            inner,
+            WINDOW_SIZE_MS, // any size
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new ValueAndTimestampSerde<>(new SerdeThatDoesntHandleNull())
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        expectLastCall();
+        replay(inner);
+        outer.init((ProcessorContext) context, outer);
+        verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final WindowStore<Bytes, byte[]> inner = mock(WindowStore.class);
+        final MeteredTimestampedWindowStore<String, String> outer = new MeteredTimestampedWindowStore<>(
+            inner,
+            WINDOW_SIZE_MS, // any size
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new ValueAndTimestampSerde<>(new SerdeThatDoesntHandleNull())
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        expectLastCall();
+        replay(inner);
+        outer.init((StateStoreContext) context, outer);
+        verify(inner);
+    }
+
     @Test
     public void shouldPassChangelogTopicNameToStateStoreSerde() {
         context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 7301694..18c0fa3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -150,6 +151,45 @@ public class MeteredWindowStoreTest {
         );
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final WindowStore<Bytes, byte[]> inner = mock(WindowStore.class);
+        final MeteredWindowStore<String, String> outer = new MeteredWindowStore<>(
+            inner,
+            WINDOW_SIZE_MS, // any size
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new SerdeThatDoesntHandleNull()
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        expectLastCall();
+        replay(inner);
+        outer.init((ProcessorContext) context, outer);
+        verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final WindowStore<Bytes, byte[]> inner = mock(WindowStore.class);
+        final MeteredWindowStore<String, String> outer = new MeteredWindowStore<>(
+            inner,
+            WINDOW_SIZE_MS, // any size
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new SerdeThatDoesntHandleNull()
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        expectLastCall();
+        replay(inner);
+        outer.init((StateStoreContext) context, outer);
+        verify(inner);
+    }
+
     @Test
     public void shouldPassChangelogTopicNameToStateStoreSerde() {
         context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index c0b5a12..e71809e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index fe901ed..6e2f4ed 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -231,8 +231,7 @@ public class MockProcessorContextTest {
 
         assertFalse(context.committed());
     }
-
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"deprecation", "unchecked"}) // TODO deprecation will be fixed in KAFKA-10437
     @Test
     public void shouldStoreAndReturnStateStores() {
         final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
index b7814c7..4d7a277 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
@@ -53,6 +53,7 @@ public class KeyValueStoreFacadeTest {
         keyValueStoreFacade = new KeyValueStoreFacade<>(mockedKeyValueTimestampStore);
     }
 
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldForwardInit() {
         final ProcessorContext context = mock(ProcessorContext.class);
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
index 3347ddd..6a2c6bd 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
@@ -47,6 +47,7 @@ public class WindowStoreFacadeTest {
         windowStoreFacade = new WindowStoreFacade<>(mockedWindowTimestampStore);
     }
 
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldForwardInit() {
         final ProcessorContext context = mock(ProcessorContext.class);
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
index 4995a77..00a9f8f 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
@@ -40,6 +40,7 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThrows;
 
 public class WindowedWordCountProcessorTest {
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldWorkWithInMemoryStore() {
         final MockProcessorContext context = new MockProcessorContext();
@@ -86,6 +87,7 @@ public class WindowedWordCountProcessorTest {
         assertThat(capturedForwards.hasNext(), is(false));
     }
 
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldWorkWithPersistentStore() throws IOException {
         final Properties properties = new Properties();
@@ -146,6 +148,7 @@ public class WindowedWordCountProcessorTest {
         }
     }
 
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldFailWithLogging() {
         final MockProcessorContext context = new MockProcessorContext();
@@ -164,6 +167,7 @@ public class WindowedWordCountProcessorTest {
         assertThrows(IllegalArgumentException.class, () -> store.init(context, store));
     }
 
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldFailWithCaching() {
         final MockProcessorContext context = new MockProcessorContext();