You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/07 02:33:45 UTC
[kafka] 01/05: KAFKA-10562: Delegate store wrappers to new init
method
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch kip-478-part-5-state-store-wrappers
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 59e3e34b44a68627718a7f76ae39b08906e97ef5
Author: John Roesler <vv...@apache.org>
AuthorDate: Thu Oct 1 10:19:59 2020 -0500
KAFKA-10562: Delegate store wrappers to new init method
---
.../processor/internals/ProcessorContextUtils.java | 7 +++
.../state/internals/CachingKeyValueStore.java | 29 ++++++++++--
.../state/internals/CachingSessionStore.java | 17 +++++++
.../state/internals/CachingWindowStore.java | 12 +++++
.../internals/ChangeLoggingKeyValueBytesStore.java | 26 +++++++++++
.../internals/ChangeLoggingSessionBytesStore.java | 17 +++++++
.../internals/ChangeLoggingWindowBytesStore.java | 13 ++++++
...ValueToTimestampedKeyValueByteStoreAdapter.java | 6 +++
.../state/internals/MeteredKeyValueStore.java | 52 +++++++++++++++++----
.../state/internals/MeteredSessionStore.java | 53 ++++++++++++++++++----
.../internals/MeteredTimestampedKeyValueStore.java | 13 ++++++
.../internals/MeteredTimestampedWindowStore.java | 15 ++++++
.../state/internals/MeteredWindowStore.java | 50 ++++++++++++++++----
.../state/internals/RocksDBWindowStore.java | 14 ++++--
.../internals/TimestampedKeyValueStoreBuilder.java | 6 +++
.../internals/TimestampedWindowStoreBuilder.java | 6 +++
.../WindowToTimestampedWindowByteStoreAdapter.java | 6 +++
.../streams/state/internals/WrappedStateStore.java | 6 +++
.../ChangeLoggingSessionBytesStoreTest.java | 3 +-
...angeLoggingTimestampedWindowBytesStoreTest.java | 3 +-
.../ChangeLoggingWindowBytesStoreTest.java | 3 +-
.../streams/internals/KeyValueStoreFacade.java | 6 +++
.../kafka/streams/internals/WindowStoreFacade.java | 6 +++
23 files changed, 332 insertions(+), 37 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
index 78b1ff5..41a1197 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
/**
@@ -52,4 +53,10 @@ public final class ProcessorContextUtils {
? ((InternalProcessorContext) context).changelogFor(storeName)
: ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
}
+
+ public static String changelogFor(final StateStoreContext context, final String storeName) {
+ return context instanceof InternalProcessorContext
+ ? ((InternalProcessorContext) context).changelogFor(storeName)
+ : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index cae38e0..ba2f949 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -57,20 +58,40 @@ public class CachingKeyValueStore
@Override
public void init(final ProcessorContext context,
final StateStore root) {
- initInternal(context);
+ if (!(context instanceof InternalProcessorContext)) {
+ throw new IllegalArgumentException(
+ "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
+ );
+ }
+ initInternal((InternalProcessorContext) context);
+ super.init(context, root);
+ // save the stream thread as we only ever want to trigger a flush
+ // when the stream thread is the current thread.
+ streamThread = Thread.currentThread();
+ }
+
+ @Override
+ public void init(final StateStoreContext context,
+ final StateStore root) {
+ if (!(context instanceof InternalProcessorContext)) {
+ throw new IllegalArgumentException(
+ "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
+ );
+ }
+ initInternal((InternalProcessorContext) context);
super.init(context, root);
// save the stream thread as we only ever want to trigger a flush
// when the stream thread is the current thread.
streamThread = Thread.currentThread();
}
- private void initInternal(final ProcessorContext context) {
- this.context = (InternalProcessorContext) context;
+ private void initInternal(final InternalProcessorContext context) {
+ this.context = context;
this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), name());
this.context.registerCacheFlushListener(cacheName, entries -> {
for (final ThreadCache.DirtyEntry entry : entries) {
- putAndMaybeForward(entry, (InternalProcessorContext) context);
+ putAndMaybeForward(entry, context);
}
});
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 4ac43a2..616bb00 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordQueue;
@@ -63,6 +64,22 @@ class CachingSessionStore
@Override
public void init(final ProcessorContext context, final StateStore root) {
+ if (!(context instanceof InternalProcessorContext)) {
+ throw new IllegalArgumentException(
+ "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
+ );
+ }
+ initInternal((InternalProcessorContext) context);
+ super.init(context, root);
+ }
+
+ @Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ if (!(context instanceof InternalProcessorContext)) {
+ throw new IllegalArgumentException(
+ "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
+ );
+ }
initInternal((InternalProcessorContext) context);
super.init(context, root);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 20be3a3..936f09c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -78,6 +79,17 @@ class CachingWindowStore
super.init(context, root);
}
+ @Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ if (!(context instanceof InternalProcessorContext)) {
+ throw new IllegalArgumentException(
+ "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
+ );
+ }
+ initInternal((InternalProcessorContext) context);
+ super.init(context, root);
+ }
+
private void initInternal(final InternalProcessorContext context) {
this.context = context;
final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 236f218..d5205ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -40,6 +41,31 @@ public class ChangeLoggingKeyValueBytesStore
public void init(final ProcessorContext context,
final StateStore root) {
super.init(context, root);
+ if (!(context instanceof InternalProcessorContext)) {
+ throw new IllegalArgumentException(
+ "Change logging requires internal features of KafkaStreams and must be disabled for unit tests."
+ );
+ }
+ this.context = (InternalProcessorContext) context;
+
+ // if the inner store is an LRU cache, add the eviction listener to log removed record
+ if (wrapped() instanceof MemoryLRUCache) {
+ ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
+ // pass null to indicate removal
+ log(key, null);
+ });
+ }
+ }
+
+ @Override
+ public void init(final StateStoreContext context,
+ final StateStore root) {
+ super.init(context, root);
+ if (!(context instanceof InternalProcessorContext)) {
+ throw new IllegalArgumentException(
+ "Change logging requires internal features of KafkaStreams and must be disabled for unit tests."
+ );
+ }
this.context = (InternalProcessorContext) context;
// if the inner store is an LRU cache, add the eviction listener to log removed record
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index cc586d3..648a47e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
@@ -41,6 +42,22 @@ class ChangeLoggingSessionBytesStore
@Override
public void init(final ProcessorContext context, final StateStore root) {
super.init(context, root);
+ if (!(context instanceof InternalProcessorContext)) {
+ throw new IllegalArgumentException(
+ "Change logging requires internal features of KafkaStreams and must be disabled for unit tests."
+ );
+ }
+ this.context = (InternalProcessorContext) context;
+ }
+
+ @Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ super.init(context, root);
+ if (!(context instanceof InternalProcessorContext)) {
+ throw new IllegalArgumentException(
+ "Change logging requires internal features of KafkaStreams and must be disabled for unit tests."
+ );
+ }
this.context = (InternalProcessorContext) context;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 8da413c..6eee6c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
@@ -56,6 +57,18 @@ class ChangeLoggingWindowBytesStore
}
@Override
+ public void init(final StateStoreContext context,
+ final StateStore root) {
+ if (!(context instanceof InternalProcessorContext)) {
+ throw new IllegalArgumentException(
+ "Change logging requires internal features of KafkaStreams and must be disabled for unit tests."
+ );
+ }
+ this.context = (InternalProcessorContext) context;
+ super.init(context, root);
+ }
+
+ @Override
public byte[] fetch(final Bytes key,
final long timestamp) {
return wrapped().fetch(key, timestamp);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
index fa29974..bb2ef26 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -88,6 +89,11 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
}
@Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ store.init(context, root);
+ }
+
+ @Override
public void flush() {
store.flush();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 31e2eff..8ca7686 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -65,7 +67,7 @@ public class MeteredKeyValueStore<K, V>
private Sensor rangeSensor;
private Sensor flushSensor;
private Sensor e2eLatencySensor;
- private ProcessorContext context;
+ private InternalProcessorContext context;
private StreamsMetricsImpl streamsMetrics;
private final String threadId;
private String taskId;
@@ -86,11 +88,36 @@ public class MeteredKeyValueStore<K, V>
@Override
public void init(final ProcessorContext context,
final StateStore root) {
- this.context = context;
+ this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
taskId = context.taskId().toString();
initStoreSerde(context);
streamsMetrics = (StreamsMetricsImpl) context.metrics();
+ registerMetrics();
+ final Sensor restoreSensor =
+ StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+
+ // register and possibly restore the state from the logs
+ maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
+ }
+
+ @Override
+ public void init(final StateStoreContext context,
+ final StateStore root) {
+ this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
+ taskId = context.taskId().toString();
+ initStoreSerde(context);
+ streamsMetrics = (StreamsMetricsImpl) context.metrics();
+
+ registerMetrics();
+ final Sensor restoreSensor =
+ StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+
+ // register and possibly restore the state from the logs
+ maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
+ }
+
+ private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
putIfAbsentSensor = StateStoreMetrics.putIfAbsentSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
putAllSensor = StateStoreMetrics.putAllSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
@@ -100,11 +127,6 @@ public class MeteredKeyValueStore<K, V>
flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
deleteSensor = StateStoreMetrics.deleteSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
- final Sensor restoreSensor =
- StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
-
- // register and possibly restore the state from the logs
- maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
}
@SuppressWarnings("unchecked")
@@ -112,6 +134,18 @@ public class MeteredKeyValueStore<K, V>
final String storeName = name();
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
serdes = new StateSerdes<>(
+ changelogTopic != null ?
+ changelogTopic :
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
+ keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+ valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+ }
+
+ @SuppressWarnings("unchecked")
+ void initStoreSerde(final StateStoreContext context) {
+ final String storeName = name();
+ final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
+ serdes = new StateSerdes<>(
changelogTopic != null ?
changelogTopic :
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
@@ -250,7 +284,9 @@ public class MeteredKeyValueStore<K, V>
}
private void maybeRecordE2ELatency() {
- if (e2eLatencySensor.shouldRecord()) {
+ // Context is null if the provided context isn't an implementation of InternalProcessorContext.
+ // In that case, we _can't_ get the current timestamp, so we don't record anything.
+ if (e2eLatencySensor.shouldRecord() && context != null) {
final long currentTime = time.milliseconds();
final long e2eLatency = currentTime - context.timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index d8ce02a..96fed94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -51,7 +53,7 @@ public class MeteredSessionStore<K, V>
private Sensor flushSensor;
private Sensor removeSensor;
private Sensor e2eLatencySensor;
- private ProcessorContext context;
+ private InternalProcessorContext context;
private final String threadId;
private String taskId;
@@ -71,16 +73,12 @@ public class MeteredSessionStore<K, V>
@Override
public void init(final ProcessorContext context,
final StateStore root) {
- this.context = context;
+ this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
initStoreSerde(context);
taskId = context.taskId().toString();
streamsMetrics = (StreamsMetricsImpl) context.metrics();
- putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
- fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
- flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
- removeSensor = StateStoreMetrics.removeSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
- e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
+ registerMetrics();
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
@@ -88,6 +86,30 @@ public class MeteredSessionStore<K, V>
maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
}
+ @Override
+ public void init(final StateStoreContext context,
+ final StateStore root) {
+ this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
+ initStoreSerde(context);
+ taskId = context.taskId().toString();
+ streamsMetrics = (StreamsMetricsImpl) context.metrics();
+
+ registerMetrics();
+ final Sensor restoreSensor =
+ StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+
+ // register and possibly restore the state from the logs
+ maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
+ }
+
+ private void registerMetrics() {
+ putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+ fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+ flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+ removeSensor = StateStoreMetrics.removeSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+ e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
+ }
+
@SuppressWarnings("unchecked")
private void initStoreSerde(final ProcessorContext context) {
final String storeName = name();
@@ -102,6 +124,19 @@ public class MeteredSessionStore<K, V>
}
@SuppressWarnings("unchecked")
+ private void initStoreSerde(final StateStoreContext context) {
+ final String storeName = name();
+ final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
+ serdes = new StateSerdes<>(
+ changelogTopic != null ?
+ changelogTopic :
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
+ keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+ valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde
+ );
+ }
+
+ @SuppressWarnings("unchecked")
@Override
public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener,
final boolean sendOldValues) {
@@ -255,7 +290,9 @@ public class MeteredSessionStore<K, V>
}
private void maybeRecordE2ELatency() {
- if (e2eLatencySensor.shouldRecord()) {
+ // Context is null if the provided context isn't an implementation of InternalProcessorContext.
+ // In that case, we _can't_ get the current timestamp, so we don't record anything.
+ if (e2eLatencySensor.shouldRecord() && context != null) {
final long currentTime = time.milliseconds();
final long e2eLatency = currentTime - context.timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 022c1f8..e4e6d08 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -62,6 +63,18 @@ public class MeteredTimestampedKeyValueStore<K, V>
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
}
+ @SuppressWarnings("unchecked")
+ void initStoreSerde(final StateStoreContext context) {
+ final String storeName = name();
+ final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
+ serdes = new StateSerdes<>(
+ changelogTopic != null ?
+ changelogTopic :
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
+ keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+ valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
+ }
+
public RawAndDeserializedValue<V> getWithBinary(final K key) {
try {
return maybeMeasureLatency(() -> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
index 3c386f3..0d83aba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.StateSerdes;
@@ -61,4 +62,18 @@ class MeteredTimestampedWindowStore<K, V>
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde
);
}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ void initStoreSerde(final StateStoreContext context) {
+ final String storeName = name();
+ final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
+ serdes = new StateSerdes<>(
+ changelogTopic != null ?
+ changelogTopic :
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
+ keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+ valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde
+ );
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 85c3ec2..c4cd1f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -50,7 +52,7 @@ public class MeteredWindowStore<K, V>
private Sensor fetchSensor;
private Sensor flushSensor;
private Sensor e2eLatencySensor;
- private ProcessorContext context;
+ private InternalProcessorContext context;
private final String threadId;
private String taskId;
@@ -72,15 +74,28 @@ public class MeteredWindowStore<K, V>
@Override
public void init(final ProcessorContext context,
final StateStore root) {
- this.context = context;
+ this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
initStoreSerde(context);
streamsMetrics = (StreamsMetricsImpl) context.metrics();
taskId = context.taskId().toString();
- putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
- fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
- flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
- e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
+ registerMetrics();
+ final Sensor restoreSensor =
+ StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+
+ // register and possibly restore the state from the logs
+ maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
+ }
+
+ @Override
+ public void init(final StateStoreContext context,
+ final StateStore root) {
+ this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
+ initStoreSerde(context);
+ streamsMetrics = (StreamsMetricsImpl) context.metrics();
+ taskId = context.taskId().toString();
+
+ registerMetrics();
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
@@ -88,6 +103,13 @@ public class MeteredWindowStore<K, V>
maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
}
+ private void registerMetrics() {
+ putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+ fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+ flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+ e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
+ }
+
@SuppressWarnings("unchecked")
void initStoreSerde(final ProcessorContext context) {
final String storeName = name();
@@ -101,6 +123,18 @@ public class MeteredWindowStore<K, V>
}
@SuppressWarnings("unchecked")
+ void initStoreSerde(final StateStoreContext context) {
+ final String storeName = name();
+ final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
+ serdes = new StateSerdes<>(
+ changelogTopic != null ?
+ changelogTopic :
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),
+ keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+ valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+ }
+
+ @SuppressWarnings("unchecked")
@Override
public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener,
final boolean sendOldValues) {
@@ -122,7 +156,7 @@ public class MeteredWindowStore<K, V>
@Override
public void put(final K key,
final V value) {
- put(key, value, context.timestamp());
+ put(key, value, context != null ? context.timestamp() : 0L);
}
@Override
@@ -264,7 +298,7 @@ public class MeteredWindowStore<K, V>
}
private void maybeRecordE2ELatency() {
- if (e2eLatencySensor.shouldRecord()) {
+ if (e2eLatencySensor.shouldRecord() && context != null) {
final long currentTime = time.milliseconds();
final long e2eLatency = currentTime - context.timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 53f0615..7c779a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -31,7 +33,7 @@ public class RocksDBWindowStore
private final boolean retainDuplicates;
private final long windowSize;
- private ProcessorContext context;
+ private InternalProcessorContext context;
private int seqnum = 0;
RocksDBWindowStore(final SegmentedBytesStore bytesStore,
@@ -44,14 +46,20 @@ public class RocksDBWindowStore
@Override
public void init(final ProcessorContext context, final StateStore root) {
- this.context = context;
+ this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
+ super.init(context, root);
+ }
+
+ @Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
super.init(context, root);
}
@Deprecated
@Override
public void put(final Bytes key, final byte[] value) {
- put(key, value, context.timestamp());
+ put(key, value, context != null ? context.timestamp() : 0L);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index be8f259..522472d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -101,6 +102,11 @@ public class TimestampedKeyValueStoreBuilder<K, V>
}
@Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ wrapped.init(context, root);
+ }
+
+ @Override
public void put(final Bytes key,
final byte[] value) {
wrapped.put(key, value);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index a544262..454b123 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -114,6 +115,11 @@ public class TimestampedWindowStoreBuilder<K, V>
wrapped.init(context, root);
}
+ @Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ wrapped.init(context, root);
+ }
+
@Deprecated
@Override
public void put(final Bytes key,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index da32599..bfeb33e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -167,6 +168,11 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
}
@Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ store.init(context, root);
+ }
+
+ @Override
public void flush() {
store.flush();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index 208bc06..c4b4b53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.TimestampedBytesStore;
/**
@@ -48,6 +49,11 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S
wrapped.init(context, root);
}
+ @Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ wrapped.init(context, root);
+ }
+
@SuppressWarnings("unchecked")
@Override
public boolean setFlushListener(final CacheFlushListener<K, V> listener,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 426a334..8753342 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
@@ -57,7 +56,7 @@ public class ChangeLoggingSessionBytesStoreTest {
private void init() {
EasyMock.expect(context.taskId()).andReturn(taskId);
EasyMock.expect(context.recordCollector()).andReturn(collector);
- inner.init((ProcessorContext) context, store);
+ inner.init((StateStoreContext) context, store);
EasyMock.expectLastCall();
EasyMock.replay(inner, context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index 9de2207..e928678 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -18,7 +18,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
@@ -59,7 +58,7 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
private void init() {
EasyMock.expect(context.taskId()).andReturn(taskId);
EasyMock.expect(context.recordCollector()).andReturn(collector);
- inner.init((ProcessorContext) context, store);
+ inner.init((StateStoreContext) context, store);
EasyMock.expectLastCall();
EasyMock.replay(inner, context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index c877ac6..82835bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -18,7 +18,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
@@ -57,7 +56,7 @@ public class ChangeLoggingWindowBytesStoreTest {
private void init() {
EasyMock.expect(context.taskId()).andReturn(taskId);
EasyMock.expect(context.recordCollector()).andReturn(collector);
- inner.init((ProcessorContext) context, store);
+ inner.init((StateStoreContext) context, store);
EasyMock.expectLastCall();
EasyMock.replay(inner, context);
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
index 749c41f..bfb8433 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -42,6 +43,11 @@ public class KeyValueStoreFacade<K, V> extends ReadOnlyKeyValueStoreFacade<K, V>
}
@Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ inner.init(context, root);
+ }
+
+ @Override
public void put(final K key,
final V value) {
inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
index 6945059..6186c2a 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -41,6 +42,11 @@ public class WindowStoreFacade<K, V> extends ReadOnlyWindowStoreFacade<K, V> imp
inner.init(context, root);
}
+ @Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ inner.init(context, root);
+ }
+
@Deprecated
@Override
public void put(final K key,