You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/07 02:33:46 UTC
[kafka] 02/05: wip
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch kip-478-part-5-state-store-wrappers
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e53cdf44111b9ca2a38424452bdb6f57b88498ff
Author: John Roesler <vv...@apache.org>
AuthorDate: Mon Oct 5 16:13:51 2020 -0500
wip
---
.../apache/kafka/streams/processor/StateStore.java | 4 ++++
.../processor/internals/ProcessorContextUtils.java | 20 ++++++++++++++++++++
.../state/internals/CachingKeyValueStore.java | 11 ++++-------
.../streams/state/internals/CachingSessionStore.java | 16 ++++------------
.../streams/state/internals/CachingWindowStore.java | 15 +++------------
.../streams/state/internals/WrappedStateStore.java | 1 +
6 files changed, 36 insertions(+), 31 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 4f47b12..570bb62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -65,7 +65,11 @@ public interface StateStore {
*
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
+ * @deprecated Since 2.7.0. Callers should invoke {@link this#init(StateStoreContext, StateStore)} instead.
+ * Implementers may choose to implement this method for backward compatibility or to throw an
+ * informative exception instead.
*/
+ @Deprecated
void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
index 41a1197..a769157 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
@@ -59,4 +59,24 @@ public final class ProcessorContextUtils {
? ((InternalProcessorContext) context).changelogFor(storeName)
: ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
}
+
+ public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) {
+ if (context instanceof InternalProcessorContext) {
+ return (InternalProcessorContext) context;
+ } else {
+ throw new IllegalArgumentException(
+ "This component requires internal features of Kafka Streams and must be disabled for unit tests."
+ );
+ }
+ }
+
+ public static InternalProcessorContext asInternalProcessorContext(final StateStoreContext context) {
+ if (context instanceof InternalProcessorContext) {
+ return (InternalProcessorContext) context;
+ } else {
+ throw new IllegalArgumentException(
+ "This component requires internal features of Kafka Streams and must be disabled for unit tests."
+ );
+ }
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index ba2f949..d88fb4c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;
@@ -55,6 +56,7 @@ public class CachingKeyValueStore
super(underlying);
}
+ @Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
@@ -63,7 +65,7 @@ public class CachingKeyValueStore
"Caching requires internal features of KafkaStreams and must be disabled for unit tests."
);
}
- initInternal((InternalProcessorContext) context);
+ initInternal(asInternalProcessorContext(context));
super.init(context, root);
// save the stream thread as we only ever want to trigger a flush
// when the stream thread is the current thread.
@@ -73,12 +75,7 @@ public class CachingKeyValueStore
@Override
public void init(final StateStoreContext context,
final StateStore root) {
- if (!(context instanceof InternalProcessorContext)) {
- throw new IllegalArgumentException(
- "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
- );
- }
- initInternal((InternalProcessorContext) context);
+ initInternal(asInternalProcessorContext(context));
super.init(context, root);
// save the stream thread as we only ever want to trigger a flush
// when the stream thread is the current thread.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 616bb00..c92123d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -35,6 +35,7 @@ import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.Objects;
+import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;
@@ -62,25 +63,16 @@ class CachingSessionStore
this.maxObservedTimestamp = RecordQueue.UNKNOWN;
}
+ @Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
- if (!(context instanceof InternalProcessorContext)) {
- throw new IllegalArgumentException(
- "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
- );
- }
- initInternal((InternalProcessorContext) context);
+ initInternal(asInternalProcessorContext(context));
super.init(context, root);
}
@Override
public void init(final StateStoreContext context, final StateStore root) {
- if (!(context instanceof InternalProcessorContext)) {
- throw new IllegalArgumentException(
- "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
- );
- }
- initInternal((InternalProcessorContext) context);
+ initInternal(asInternalProcessorContext(context));
super.init(context, root);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 936f09c..aea32f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -38,6 +38,7 @@ import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;
@@ -70,23 +71,13 @@ class CachingWindowStore
@Override
public void init(final ProcessorContext context, final StateStore root) {
- if (!(context instanceof InternalProcessorContext)) {
- throw new IllegalArgumentException(
- "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
- );
- }
- initInternal((InternalProcessorContext) context);
+ initInternal(asInternalProcessorContext(context));
super.init(context, root);
}
@Override
public void init(final StateStoreContext context, final StateStore root) {
- if (!(context instanceof InternalProcessorContext)) {
- throw new IllegalArgumentException(
- "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
- );
- }
- initInternal((InternalProcessorContext) context);
+ initInternal(asInternalProcessorContext(context));
super.init(context, root);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index c4b4b53..e8244f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -43,6 +43,7 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S
this.wrapped = wrapped;
}
+ @Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {