You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/07 02:33:48 UTC
[kafka] 04/05: wip
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch kip-478-part-5-state-store-wrappers
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit f4448fd066d2839e8f15c29a7f91015023eb06f0
Author: John Roesler <vv...@apache.org>
AuthorDate: Tue Oct 6 11:58:05 2020 -0500
wip
---
.../internals/AbstractRocksDBSegmentedBytesStore.java | 1 +
.../streams/state/internals/InMemorySessionStore.java | 1 +
.../kafka/streams/state/internals/InMemoryWindowStore.java | 1 +
.../kafka/streams/state/internals/KeyValueSegment.java | 6 ++++--
.../kafka/streams/state/internals/KeyValueSegments.java | 2 +-
.../kafka/streams/state/internals/TimestampedSegment.java | 7 ++++---
.../kafka/streams/state/internals/TimestampedSegments.java | 2 +-
.../internals/GlobalProcessorContextImplTest.java | 11 ++++++-----
.../processor/internals/ProcessorContextImplTest.java | 3 ++-
.../kafka/streams/state/KeyValueStoreTestDriver.java | 3 ++-
.../org/apache/kafka/streams/state/NoOpWindowStore.java | 1 +
.../streams/state/internals/AbstractKeyValueStoreTest.java | 3 ++-
.../streams/state/internals/CachingKeyValueStoreTest.java | 2 +-
.../internals/ChangeLoggingSessionBytesStoreTest.java | 7 +++++--
.../state/internals/InMemoryKeyValueLoggedStoreTest.java | 3 ++-
.../streams/state/internals/InMemoryKeyValueStoreTest.java | 3 ++-
.../streams/state/internals/InMemoryLRUCacheStoreTest.java | 3 ++-
.../kafka/streams/state/internals/KeyValueSegmentTest.java | 2 +-
.../streams/state/internals/ReadOnlyWindowStoreStub.java | 1 +
.../streams/state/internals/RocksDBKeyValueStoreTest.java | 3 ++-
.../kafka/streams/state/internals/RocksDBStoreTest.java | 14 +++++++-------
.../streams/state/internals/TimestampedSegmentTest.java | 2 +-
.../apache/kafka/test/GenericInMemoryKeyValueStore.java | 1 +
.../test/GenericInMemoryTimestampedKeyValueStore.java | 1 +
.../test/java/org/apache/kafka/test/MockKeyValueStore.java | 1 +
.../test/java/org/apache/kafka/test/NoOpReadOnlyStore.java | 1 +
.../org/apache/kafka/test/ReadOnlySessionStoreStub.java | 1 +
.../kafka/streams/internals/KeyValueStoreFacade.java | 1 +
.../apache/kafka/streams/internals/WindowStoreFacade.java | 1 +
29 files changed, 57 insertions(+), 31 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 2e9882d..1014bfa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -225,6 +225,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
return name;
}
+ @Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index e4fda06..2e45b48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -72,6 +72,7 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
return name;
}
+ @Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
this.context = (InternalProcessorContext) context;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 49322b0..cd50b15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -85,6 +85,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
return name;
}
+ @Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
this.context = context;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
index a64df19..55a73c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
@@ -20,7 +20,9 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import java.io.File;
import java.io.IOException;
+import java.util.Map;
import java.util.Objects;
class KeyValueSegment extends RocksDBStore implements Comparable<KeyValueSegment>, Segment {
@@ -45,8 +47,8 @@ class KeyValueSegment extends RocksDBStore implements Comparable<KeyValueSegment
}
@Override
- public void openDB(final ProcessorContext context) {
- super.openDB(context);
+ public void openDB(final Map<String, Object> configs, final File stateDir) {
+ super.openDB(configs, stateDir);
// skip the registering step
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
index c8b4b90..a17666e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
@@ -48,7 +48,7 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
throw new IllegalStateException("KeyValueSegment already exists. Possible concurrent access.");
}
- newSegment.openDB(context);
+ newSegment.openDB(context.appConfigs(), context.stateDir());
return newSegment;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
index 3401d3e..59eb469 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
@@ -20,7 +20,9 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import java.io.File;
import java.io.IOException;
+import java.util.Map;
import java.util.Objects;
class TimestampedSegment extends RocksDBTimestampedStore implements Comparable<TimestampedSegment>, Segment {
@@ -45,10 +47,9 @@ class TimestampedSegment extends RocksDBTimestampedStore implements Comparable<T
}
@Override
- public void openDB(final ProcessorContext context) {
- super.openDB(context);
+ public void openDB(final Map<String, Object> configs, final File stateDir) {
+ super.openDB(configs, stateDir);
// skip the registering step
- internalProcessorContext = context;
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
index 58f3bbc..7318208 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
@@ -48,7 +48,7 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
throw new IllegalStateException("TimestampedSegment already exists. Possible concurrent access.");
}
- newSegment.openDB(context);
+ newSegment.openDB(context.appConfigs(), context.stateDir());
return newSegment;
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index a83c92b..1ae6630 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -147,7 +148,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForKeyValueStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
try {
- store.init((ProcessorContext) null, null);
+ store.init((StateStoreContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -156,7 +157,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForTimestampedKeyValueStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
try {
- store.init((ProcessorContext) null, null);
+ store.init((StateStoreContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -165,7 +166,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForWindowStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
try {
- store.init((ProcessorContext) null, null);
+ store.init((StateStoreContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -174,7 +175,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForTimestampedWindowStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
try {
- store.init((ProcessorContext) null, null);
+ store.init((StateStoreContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -183,7 +184,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForSessionStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
try {
- store.init((ProcessorContext) null, null);
+ store.init((StateStoreContext) null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index ab88efa..50466e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
@@ -767,7 +768,7 @@ public class ProcessorContextImplTest {
assertTrue(store.persistent());
assertTrue(store.isOpen());
- checkThrowsUnsupportedOperation(() -> store.init((ProcessorContext) null, null), "init()");
+ checkThrowsUnsupportedOperation(() -> store.init((StateStoreContext) null, null), "init()");
checkThrowsUnsupportedOperation(store::close, "close()");
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 7a39121..11e707d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
@@ -331,7 +332,7 @@ public class KeyValueStoreTestDriver<K, V> {
* @return the processing context; never null
* @see #addEntryToRestoreLog(Object, Object)
*/
- public ProcessorContext context() {
+ public StateStoreContext context() {
return context;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
index 39e5d03..0de4890 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
@@ -54,6 +54,7 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
return "";
}
+ @Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 4c9c044..4ab15d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -51,7 +52,7 @@ import static org.junit.Assert.fail;
public abstract class AbstractKeyValueStoreTest {
- protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context);
+ protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context);
protected InternalMockProcessorContext context;
protected KeyValueStore<Integer, String> store;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 98f0ba6..c6fe483 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -88,7 +88,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context) {
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("cache-store"),
(Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 6f85116..c55c4e15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -64,7 +64,9 @@ public class ChangeLoggingSessionBytesStoreTest {
store.init((StateStoreContext) context, store);
}
- @Test void shouldDelegateDeprecatedInit() {
+ @SuppressWarnings("deprecation")
+ @Test
+ public void shouldDelegateDeprecatedInit() {
inner.init((ProcessorContext) context, store);
EasyMock.expectLastCall();
EasyMock.replay(inner);
@@ -72,7 +74,8 @@ public class ChangeLoggingSessionBytesStoreTest {
EasyMock.verify(inner);
}
- @Test void shouldDelegateInit() {
+ @Test
+ public void shouldDelegateInit() {
inner.init((StateStoreContext) context, store);
EasyMock.expectLastCall();
EasyMock.replay(inner);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index 7c0d16c..6f4104e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -28,7 +29,7 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context) {
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-store"),
(Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 6dc90ea..b6c46e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -31,7 +32,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context) {
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-store"),
(Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 2a86cdd..ae791cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -36,7 +37,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context) {
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.lruMap("my-store", 10),
(Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
index c062e61..71841bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
@@ -67,7 +67,7 @@ public class KeyValueSegmentTest {
expect(mockContext.stateDir()).andReturn(directory);
replay(mockContext);
- segment.openDB(mockContext);
+ segment.openDB(mockContext.appConfigs(), mockContext.stateDir());
assertTrue(new File(directoryPath, "window").exists());
assertTrue(new File(directoryPath + File.separator + "window", "segment").exists());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index 55f91f1..96c3ae0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -382,6 +382,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
return null;
}
+ @Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 5937af0..c0b5a12 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -38,7 +39,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context) {
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
(Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index dead979..e0aad64 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -157,7 +157,7 @@ public class RocksDBStoreTest {
metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), notNull(), isNull());
replay(metricsRecorder);
- rocksDBStore.openDB(context);
+ rocksDBStore.openDB(context.appConfigs(), context.stateDir());
verify(metricsRecorder);
reset(metricsRecorder);
@@ -171,7 +171,7 @@ public class RocksDBStoreTest {
metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), notNull(), notNull());
replay(metricsRecorder);
- rocksDBStore.openDB(context);
+ rocksDBStore.openDB(context.appConfigs(), context.stateDir());
verify(metricsRecorder);
reset(metricsRecorder);
@@ -182,7 +182,7 @@ public class RocksDBStoreTest {
rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
try {
context = getProcessorContext(RecordingLevel.DEBUG);
- rocksDBStore.openDB(context);
+ rocksDBStore.openDB(context.appConfigs(), context.stateDir());
reset(metricsRecorder);
metricsRecorder.removeValueProviders(DB_NAME);
replay(metricsRecorder);
@@ -212,7 +212,7 @@ public class RocksDBStoreTest {
metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), notNull(), isNull());
replay(metricsRecorder);
- rocksDBStore.openDB(context);
+ rocksDBStore.openDB(context.appConfigs(), context.stateDir());
verify(metricsRecorder);
reset(metricsRecorder);
}
@@ -242,7 +242,7 @@ public class RocksDBStoreTest {
"the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to " +
"the RocksDB options.",
ProcessorStateException.class,
- () -> rocksDBStore.openDB(context)
+ () -> rocksDBStore.openDB(context.appConfigs(), context.stateDir())
);
}
@@ -268,7 +268,7 @@ public class RocksDBStoreTest {
metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), isNull(), notNull());
replay(metricsRecorder);
- rocksDBStore.openDB(context);
+ rocksDBStore.openDB(context.appConfigs(), context.stateDir());
verify(metricsRecorder);
reset(metricsRecorder);
}
@@ -310,7 +310,7 @@ public class RocksDBStoreTest {
assertTrue(tmpDir.setReadOnly());
- assertThrows(ProcessorStateException.class, () -> rocksDBStore.openDB(tmpContext));
+ assertThrows(ProcessorStateException.class, () -> rocksDBStore.openDB(tmpContext.appConfigs(), tmpContext.stateDir()));
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
index 0e04851..3321810 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
@@ -67,7 +67,7 @@ public class TimestampedSegmentTest {
expect(mockContext.stateDir()).andReturn(directory);
replay(mockContext);
- segment.openDB(mockContext);
+ segment.openDB(mockContext.appConfigs(), mockContext.stateDir());
assertTrue(new File(directoryPath, "window").exists());
assertTrue(new File(directoryPath + File.separator + "window", "segment").exists());
diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
index 649dc5b..72e6c26 100644
--- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
@@ -58,6 +58,7 @@ public class GenericInMemoryKeyValueStore<K extends Comparable, V>
return this.name;
}
+ @Deprecated
@Override
@SuppressWarnings("unchecked")
/* This is a "dummy" store used for testing;
diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
index b1b75a1..c77cbac 100644
--- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
@@ -59,6 +59,7 @@ public class GenericInMemoryTimestampedKeyValueStore<K extends Comparable, V>
return this.name;
}
+ @Deprecated
@Override
@SuppressWarnings("unchecked")
/* This is a "dummy" store used for testing;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
index 7cb376f..d9e4acf 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
@@ -54,6 +54,7 @@ public class MockKeyValueStore implements KeyValueStore<Object, Object> {
return name;
}
+ @Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index dd78c52..c6b5eee 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -69,6 +69,7 @@ public class NoOpReadOnlyStore<K, V> implements ReadOnlyKeyValueStore<K, V>, Sta
return name;
}
+ @Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
if (rocksdbStore) {
diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
index 4f6d5de..a2924fc 100644
--- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
@@ -92,6 +92,7 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V
return "";
}
+ @Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
index bfb8433..94b5c8e 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
@@ -36,6 +36,7 @@ public class KeyValueStoreFacade<K, V> extends ReadOnlyKeyValueStoreFacade<K, V>
super(inner);
}
+ @Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
index 6186c2a..342817e 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
@@ -36,6 +36,7 @@ public class WindowStoreFacade<K, V> extends ReadOnlyWindowStoreFacade<K, V> imp
super(store);
}
+ @Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {