You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/07 02:33:48 UTC

[kafka] 04/05: wip

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kip-478-part-5-state-store-wrappers
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f4448fd066d2839e8f15c29a7f91015023eb06f0
Author: John Roesler <vv...@apache.org>
AuthorDate: Tue Oct 6 11:58:05 2020 -0500

    wip
---
 .../internals/AbstractRocksDBSegmentedBytesStore.java      |  1 +
 .../streams/state/internals/InMemorySessionStore.java      |  1 +
 .../kafka/streams/state/internals/InMemoryWindowStore.java |  1 +
 .../kafka/streams/state/internals/KeyValueSegment.java     |  6 ++++--
 .../kafka/streams/state/internals/KeyValueSegments.java    |  2 +-
 .../kafka/streams/state/internals/TimestampedSegment.java  |  7 ++++---
 .../kafka/streams/state/internals/TimestampedSegments.java |  2 +-
 .../internals/GlobalProcessorContextImplTest.java          | 11 ++++++-----
 .../processor/internals/ProcessorContextImplTest.java      |  3 ++-
 .../kafka/streams/state/KeyValueStoreTestDriver.java       |  3 ++-
 .../org/apache/kafka/streams/state/NoOpWindowStore.java    |  1 +
 .../streams/state/internals/AbstractKeyValueStoreTest.java |  3 ++-
 .../streams/state/internals/CachingKeyValueStoreTest.java  |  2 +-
 .../internals/ChangeLoggingSessionBytesStoreTest.java      |  7 +++++--
 .../state/internals/InMemoryKeyValueLoggedStoreTest.java   |  3 ++-
 .../streams/state/internals/InMemoryKeyValueStoreTest.java |  3 ++-
 .../streams/state/internals/InMemoryLRUCacheStoreTest.java |  3 ++-
 .../kafka/streams/state/internals/KeyValueSegmentTest.java |  2 +-
 .../streams/state/internals/ReadOnlyWindowStoreStub.java   |  1 +
 .../streams/state/internals/RocksDBKeyValueStoreTest.java  |  3 ++-
 .../kafka/streams/state/internals/RocksDBStoreTest.java    | 14 +++++++-------
 .../streams/state/internals/TimestampedSegmentTest.java    |  2 +-
 .../apache/kafka/test/GenericInMemoryKeyValueStore.java    |  1 +
 .../test/GenericInMemoryTimestampedKeyValueStore.java      |  1 +
 .../test/java/org/apache/kafka/test/MockKeyValueStore.java |  1 +
 .../test/java/org/apache/kafka/test/NoOpReadOnlyStore.java |  1 +
 .../org/apache/kafka/test/ReadOnlySessionStoreStub.java    |  1 +
 .../kafka/streams/internals/KeyValueStoreFacade.java       |  1 +
 .../apache/kafka/streams/internals/WindowStoreFacade.java  |  1 +
 29 files changed, 57 insertions(+), 31 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 2e9882d..1014bfa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -225,6 +225,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
         return name;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index e4fda06..2e45b48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -72,6 +72,7 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
         return name;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = (InternalProcessorContext) context;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 49322b0..cd50b15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -85,6 +85,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
         return name;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = context;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
index a64df19..55a73c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
@@ -20,7 +20,9 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.Map;
 import java.util.Objects;
 
 class KeyValueSegment extends RocksDBStore implements Comparable<KeyValueSegment>, Segment {
@@ -45,8 +47,8 @@ class KeyValueSegment extends RocksDBStore implements Comparable<KeyValueSegment
     }
 
     @Override
-    public void openDB(final ProcessorContext context) {
-        super.openDB(context);
+    public void openDB(final Map<String, Object> configs, final File stateDir) {
+        super.openDB(configs, stateDir);
         // skip the registering step
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
index c8b4b90..a17666e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
@@ -48,7 +48,7 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
                 throw new IllegalStateException("KeyValueSegment already exists. Possible concurrent access.");
             }
 
-            newSegment.openDB(context);
+            newSegment.openDB(context.appConfigs(), context.stateDir());
             return newSegment;
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
index 3401d3e..59eb469 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
@@ -20,7 +20,9 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.Map;
 import java.util.Objects;
 
 class TimestampedSegment extends RocksDBTimestampedStore implements Comparable<TimestampedSegment>, Segment {
@@ -45,10 +47,9 @@ class TimestampedSegment extends RocksDBTimestampedStore implements Comparable<T
     }
 
     @Override
-    public void openDB(final ProcessorContext context) {
-        super.openDB(context);
+    public void openDB(final Map<String, Object> configs, final File stateDir) {
+        super.openDB(configs, stateDir);
         // skip the registering step
-        internalProcessorContext = context;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
index 58f3bbc..7318208 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
@@ -48,7 +48,7 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
                 throw new IllegalStateException("TimestampedSegment already exists. Possible concurrent access.");
             }
 
-            newSegment.openDB(context);
+            newSegment.openDB(context.appConfigs(), context.stateDir());
             return newSegment;
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index a83c92b..1ae6630 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -147,7 +148,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForKeyValueStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
         try {
-            store.init((ProcessorContext) null, null);
+            store.init((StateStoreContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -156,7 +157,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForTimestampedKeyValueStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
         try {
-            store.init((ProcessorContext) null, null);
+            store.init((StateStoreContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -165,7 +166,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForWindowStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
         try {
-            store.init((ProcessorContext) null, null);
+            store.init((StateStoreContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -174,7 +175,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForTimestampedWindowStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
         try {
-            store.init((ProcessorContext) null, null);
+            store.init((StateStoreContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -183,7 +184,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForSessionStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
         try {
-            store.init((ProcessorContext) null, null);
+            store.init((StateStoreContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index ab88efa..50466e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
@@ -767,7 +768,7 @@ public class ProcessorContextImplTest {
         assertTrue(store.persistent());
         assertTrue(store.isOpen());
 
-        checkThrowsUnsupportedOperation(() -> store.init((ProcessorContext) null, null), "init()");
+        checkThrowsUnsupportedOperation(() -> store.init((StateStoreContext) null, null), "init()");
         checkThrowsUnsupportedOperation(store::close, "close()");
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 7a39121..11e707d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
@@ -331,7 +332,7 @@ public class KeyValueStoreTestDriver<K, V> {
      * @return the processing context; never null
      * @see #addEntryToRestoreLog(Object, Object)
      */
-    public ProcessorContext context() {
+    public StateStoreContext context() {
         return context;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
index 39e5d03..0de4890 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
@@ -54,6 +54,7 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
         return "";
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 4c9c044..4ab15d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -51,7 +52,7 @@ import static org.junit.Assert.fail;
 
 public abstract class AbstractKeyValueStoreTest {
 
-    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context);
+    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context);
 
     protected InternalMockProcessorContext context;
     protected KeyValueStore<Integer, String> store;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 98f0ba6..c6fe483 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -88,7 +88,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context) {
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
                 Stores.persistentKeyValueStore("cache-store"),
                 (Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 6f85116..c55c4e15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -64,7 +64,9 @@ public class ChangeLoggingSessionBytesStoreTest {
         store.init((StateStoreContext) context, store);
     }
 
-    @Test void shouldDelegateDeprecatedInit() {
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
         inner.init((ProcessorContext) context, store);
         EasyMock.expectLastCall();
         EasyMock.replay(inner);
@@ -72,7 +74,8 @@ public class ChangeLoggingSessionBytesStoreTest {
         EasyMock.verify(inner);
     }
 
-    @Test void shouldDelegateInit() {
+    @Test
+    public void shouldDelegateInit() {
         inner.init((StateStoreContext) context, store);
         EasyMock.expectLastCall();
         EasyMock.replay(inner);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index 7c0d16c..6f4104e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -28,7 +29,7 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context) {
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
             Stores.inMemoryKeyValueStore("my-store"),
             (Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 6dc90ea..b6c46e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -31,7 +32,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context) {
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
             Stores.inMemoryKeyValueStore("my-store"),
             (Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 2a86cdd..ae791cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -36,7 +37,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context) {
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
                 Stores.lruMap("my-store", 10),
                 (Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
index c062e61..71841bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
@@ -67,7 +67,7 @@ public class KeyValueSegmentTest {
         expect(mockContext.stateDir()).andReturn(directory);
         replay(mockContext);
 
-        segment.openDB(mockContext);
+        segment.openDB(mockContext.appConfigs(), mockContext.stateDir());
 
         assertTrue(new File(directoryPath, "window").exists());
         assertTrue(new File(directoryPath + File.separator + "window", "segment").exists());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index 55f91f1..96c3ae0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -382,6 +382,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
         return null;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 5937af0..c0b5a12 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -38,7 +39,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context) {
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
             Stores.persistentKeyValueStore("my-store"),
             (Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index dead979..e0aad64 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -157,7 +157,7 @@ public class RocksDBStoreTest {
         metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), notNull(), isNull());
         replay(metricsRecorder);
 
-        rocksDBStore.openDB(context);
+        rocksDBStore.openDB(context.appConfigs(), context.stateDir());
 
         verify(metricsRecorder);
         reset(metricsRecorder);
@@ -171,7 +171,7 @@ public class RocksDBStoreTest {
         metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), notNull(), notNull());
         replay(metricsRecorder);
 
-        rocksDBStore.openDB(context);
+        rocksDBStore.openDB(context.appConfigs(), context.stateDir());
 
         verify(metricsRecorder);
         reset(metricsRecorder);
@@ -182,7 +182,7 @@ public class RocksDBStoreTest {
         rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
         try {
             context = getProcessorContext(RecordingLevel.DEBUG);
-            rocksDBStore.openDB(context);
+            rocksDBStore.openDB(context.appConfigs(), context.stateDir());
             reset(metricsRecorder);
             metricsRecorder.removeValueProviders(DB_NAME);
             replay(metricsRecorder);
@@ -212,7 +212,7 @@ public class RocksDBStoreTest {
         metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), notNull(), isNull());
         replay(metricsRecorder);
 
-        rocksDBStore.openDB(context);
+        rocksDBStore.openDB(context.appConfigs(), context.stateDir());
         verify(metricsRecorder);
         reset(metricsRecorder);
     }
@@ -242,7 +242,7 @@ public class RocksDBStoreTest {
                 "the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to " +
                 "the RocksDB options.",
             ProcessorStateException.class,
-            () -> rocksDBStore.openDB(context)
+            () -> rocksDBStore.openDB(context.appConfigs(), context.stateDir())
         );
     }
 
@@ -268,7 +268,7 @@ public class RocksDBStoreTest {
         metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), isNull(), notNull());
         replay(metricsRecorder);
 
-        rocksDBStore.openDB(context);
+        rocksDBStore.openDB(context.appConfigs(), context.stateDir());
         verify(metricsRecorder);
         reset(metricsRecorder);
     }
@@ -310,7 +310,7 @@ public class RocksDBStoreTest {
 
         assertTrue(tmpDir.setReadOnly());
 
-        assertThrows(ProcessorStateException.class, () -> rocksDBStore.openDB(tmpContext));
+        assertThrows(ProcessorStateException.class, () -> rocksDBStore.openDB(tmpContext.appConfigs(), tmpContext.stateDir()));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
index 0e04851..3321810 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
@@ -67,7 +67,7 @@ public class TimestampedSegmentTest {
         expect(mockContext.stateDir()).andReturn(directory);
         replay(mockContext);
 
-        segment.openDB(mockContext);
+        segment.openDB(mockContext.appConfigs(), mockContext.stateDir());
 
         assertTrue(new File(directoryPath, "window").exists());
         assertTrue(new File(directoryPath + File.separator + "window", "segment").exists());
diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
index 649dc5b..72e6c26 100644
--- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
@@ -58,6 +58,7 @@ public class GenericInMemoryKeyValueStore<K extends Comparable, V>
         return this.name;
     }
 
+    @Deprecated
     @Override
     @SuppressWarnings("unchecked")
     /* This is a "dummy" store used for testing;
diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
index b1b75a1..c77cbac 100644
--- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
@@ -59,6 +59,7 @@ public class GenericInMemoryTimestampedKeyValueStore<K extends Comparable, V>
         return this.name;
     }
 
+    @Deprecated
     @Override
     @SuppressWarnings("unchecked")
     /* This is a "dummy" store used for testing;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
index 7cb376f..d9e4acf 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
@@ -54,6 +54,7 @@ public class MockKeyValueStore implements KeyValueStore<Object, Object> {
         return name;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index dd78c52..c6b5eee 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -69,6 +69,7 @@ public class NoOpReadOnlyStore<K, V> implements ReadOnlyKeyValueStore<K, V>, Sta
         return name;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         if (rocksdbStore) {
diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
index 4f6d5de..a2924fc 100644
--- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
@@ -92,6 +92,7 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V
         return "";
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
 
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
index bfb8433..94b5c8e 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
@@ -36,6 +36,7 @@ public class KeyValueStoreFacade<K, V> extends ReadOnlyKeyValueStoreFacade<K, V>
         super(inner);
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
index 6186c2a..342817e 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
@@ -36,6 +36,7 @@ public class WindowStoreFacade<K, V> extends ReadOnlyWindowStoreFacade<K, V> imp
         super(store);
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {