You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/07 02:33:46 UTC

[kafka] 02/05: wip

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kip-478-part-5-state-store-wrappers
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e53cdf44111b9ca2a38424452bdb6f57b88498ff
Author: John Roesler <vv...@apache.org>
AuthorDate: Mon Oct 5 16:13:51 2020 -0500

    wip
---
 .../apache/kafka/streams/processor/StateStore.java   |  4 ++++
 .../processor/internals/ProcessorContextUtils.java   | 20 ++++++++++++++++++++
 .../state/internals/CachingKeyValueStore.java        | 11 ++++-------
 .../streams/state/internals/CachingSessionStore.java | 16 ++++------------
 .../streams/state/internals/CachingWindowStore.java  | 15 +++------------
 .../streams/state/internals/WrappedStateStore.java   |  1 +
 6 files changed, 36 insertions(+), 31 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 4f47b12..570bb62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -65,7 +65,11 @@ public interface StateStore {
      *
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
+     * @deprecated Since 2.7.0. Callers should invoke {@link this#init(StateStoreContext, StateStore)} instead.
+     *             Implementers may choose to implement this method for backward compatibility or to throw an
+     *             informative exception instead.
      */
+    @Deprecated
     void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
index 41a1197..a769157 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
@@ -59,4 +59,24 @@ public final class ProcessorContextUtils {
             ? ((InternalProcessorContext) context).changelogFor(storeName)
             : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
     }
+
+    public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) {
+        if (context instanceof InternalProcessorContext) {
+            return (InternalProcessorContext) context;
+        } else {
+            throw new IllegalArgumentException(
+                "This component requires internal features of Kafka Streams and must be disabled for unit tests."
+            );
+        }
+    }
+
+    public static InternalProcessorContext asInternalProcessorContext(final StateStoreContext context) {
+        if (context instanceof InternalProcessorContext) {
+            return (InternalProcessorContext) context;
+        } else {
+            throw new IllegalArgumentException(
+                "This component requires internal features of Kafka Streams and must be disabled for unit tests."
+            );
+        }
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index ba2f949..d88fb4c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
 import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
 import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;
 
@@ -55,6 +56,7 @@ public class CachingKeyValueStore
         super(underlying);
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
@@ -63,7 +65,7 @@ public class CachingKeyValueStore
                 "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
             );
         }
-        initInternal((InternalProcessorContext) context);
+        initInternal(asInternalProcessorContext(context));
         super.init(context, root);
         // save the stream thread as we only ever want to trigger a flush
         // when the stream thread is the current thread.
@@ -73,12 +75,7 @@ public class CachingKeyValueStore
     @Override
     public void init(final StateStoreContext context,
                      final StateStore root) {
-        if (!(context instanceof InternalProcessorContext)) {
-            throw new IllegalArgumentException(
-                "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
-            );
-        }
-        initInternal((InternalProcessorContext) context);
+        initInternal(asInternalProcessorContext(context));
         super.init(context, root);
         // save the stream thread as we only ever want to trigger a flush
         // when the stream thread is the current thread.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 616bb00..c92123d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -35,6 +35,7 @@ import java.util.LinkedList;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 
+import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
 import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
 import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;
 
@@ -62,25 +63,16 @@ class CachingSessionStore
         this.maxObservedTimestamp = RecordQueue.UNKNOWN;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        if (!(context instanceof InternalProcessorContext)) {
-            throw new IllegalArgumentException(
-                "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
-            );
-        }
-        initInternal((InternalProcessorContext) context);
+        initInternal(asInternalProcessorContext(context));
         super.init(context, root);
     }
 
     @Override
     public void init(final StateStoreContext context, final StateStore root) {
-        if (!(context instanceof InternalProcessorContext)) {
-            throw new IllegalArgumentException(
-                "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
-            );
-        }
-        initInternal((InternalProcessorContext) context);
+        initInternal(asInternalProcessorContext(context));
         super.init(context, root);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 936f09c..aea32f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -38,6 +38,7 @@ import java.util.LinkedList;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
 import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
 import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;
 
@@ -70,23 +71,13 @@ class CachingWindowStore
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        if (!(context instanceof InternalProcessorContext)) {
-            throw new IllegalArgumentException(
-                "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
-            );
-        }
-        initInternal((InternalProcessorContext) context);
+        initInternal(asInternalProcessorContext(context));
         super.init(context, root);
     }
 
     @Override
     public void init(final StateStoreContext context, final StateStore root) {
-        if (!(context instanceof InternalProcessorContext)) {
-            throw new IllegalArgumentException(
-                "Caching requires internal features of KafkaStreams and must be disabled for unit tests."
-            );
-        }
-        initInternal((InternalProcessorContext) context);
+        initInternal(asInternalProcessorContext(context));
         super.init(context, root);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index c4b4b53..e8244f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -43,6 +43,7 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S
         this.wrapped = wrapped;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {