You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/26 02:38:31 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #9222: KAFKA-10437: Implement test-utils and StateStore changes for KIP-478

vvcephei commented on a change in pull request #9222:
URL: https://github.com/apache/kafka/pull/9222#discussion_r476988013



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -61,7 +84,14 @@
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
-    void init(ProcessorContext context, StateStore root);
+    default void init(final ProcessorContext<?, ?> context, final StateStore root) {
+        final org.apache.kafka.streams.processor.ProcessorContext adapted =
+            ProcessorContextReverseAdapter.adapt(
+                context,
+                new ProcessorContextReverseAdapter.UnsupportedDeprecatedForwarder()
+            );
+        init(adapted, root);
+    }

Review comment:
       We have to add this so that we can pass in the new ProcessorContext. The default implementation delegates to the old `init` method so that existing store implementations will function with no changes.
   
   If the only callers were internal, we could just adapt at the call site. Unfortunately, users can also call `StateStore#init`, and they would do it if they have their own store implementations or if they use `MockProcessorContext` to test a stateful processor.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+public final class InternalProcessorContextReverseAdapter implements InternalProcessorContext {

Review comment:
       I just renamed this from `ProcessorContextReverseAdapter` (which confusingly implemented `InternalProcessorContext`), so that I could add an adapter specifically for non-Internal `ProcessorContext`s.

##########
File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
##########
@@ -36,7 +36,7 @@
 public class WordCountProcessorTest {
     @Test
     public void test() {
-        final MockProcessorContext context = new MockProcessorContext();
+        final MockProcessorContext<String, String> context = new MockProcessorContext<>();

Review comment:
       Switching over to the mock of the new API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -45,6 +47,27 @@
      */
     String name();
 
+    /**
+     * Initializes this state store.
+     * <p>
+     * The implementation of this function must register the root store in the context via the
+     * {@link org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, StateRestoreCallback)} function,
+     * where the first {@link StateStore} parameter should always be the passed-in {@code root} object, and
+     * the second parameter should be an object of user's implementation
+     * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
+     * <p>
+     * Note that if the state store engine itself supports bulk writes, users can implement another
+     * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
+     * let users implement bulk-load restoration logic instead of restoring one record at a time.
+     * <p>
+     * This method is not called if {@link StateStore#init(ProcessorContext, org.apache.kafka.streams.processor.StateStore)}
+     * is implemented.
+     *
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);

Review comment:
       Not sure why the diff happened this way, but this is the original `init` method. I've just qualified ProcessorContext and added this note to the javadoc: `This method is not called if {@link StateStore#init(ProcessorContext, StateStore)} is implemented.`
   
   Later on, when we deprecate `processor.ProcessorContext`, this method will also become deprecated, pushing store implementers to move to the new API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
##########
@@ -18,35 +18,52 @@
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
 import java.time.Duration;
 import java.util.Map;
 
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
-    private final InternalApiProcessorContext<Object, Object> delegate;
+public final class ProcessorContextReverseAdapter implements org.apache.kafka.streams.processor.ProcessorContext {

Review comment:
       Here's the new one, which directly adapts non-Internal `ProcessorContext`. This became necessary, since the `MockProcessorContext` is _not_ an `InternalProcessorContext`, but it still needs to be adapted internally.
   
   Once the KIP is complete, most of these adapters will go away.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
##########
@@ -18,35 +18,52 @@
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
 import java.time.Duration;
 import java.util.Map;
 
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
-    private final InternalApiProcessorContext<Object, Object> delegate;
+public final class ProcessorContextReverseAdapter implements org.apache.kafka.streams.processor.ProcessorContext {
+    private final ProcessorContext<Object, Object> delegate;
+    private final DeprecatedForwarder deprecatedForwarder;
 
-    static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
+    public interface DeprecatedForwarder {
+        <K, V> void forward(final K key, final V value, final int childIndex);
+    }
+
+    public static final class UnsupportedDeprecatedForwarder implements DeprecatedForwarder {
+        @Override
+        public <K, V> void forward(final K key, final V value, final int childIndex) {
+            throw new UnsupportedOperationException("Forwarding by index was deprecated in 2.0 and is not supported by this ProcessorContext.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static org.apache.kafka.streams.processor.ProcessorContext adapt(final ProcessorContext<?, ?> delegate,
+                                                                            final DeprecatedForwarder deprecatedForwarder) {

Review comment:
       In an attempt to avoid throwing an exception unless there's no alternative, callers of `adapt` have to define what happens if someone calls `forward(final K key, final V value, final int childIndex)`

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {

Review comment:
       new MockProcessorContext implementing the new `api.ProcessorContext`. Note the generics.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextReverseAdapter;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class MockApiProcessorContextTest {

Review comment:
       The test for the new `MockProcessorContext`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
##########
@@ -18,35 +18,52 @@
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
 import java.time.Duration;
 import java.util.Map;
 
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
-    private final InternalApiProcessorContext<Object, Object> delegate;
+public final class ProcessorContextReverseAdapter implements org.apache.kafka.streams.processor.ProcessorContext {
+    private final ProcessorContext<Object, Object> delegate;
+    private final DeprecatedForwarder deprecatedForwarder;
 
-    static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
+    public interface DeprecatedForwarder {
+        <K, V> void forward(final K key, final V value, final int childIndex);
+    }
+
+    public static final class UnsupportedDeprecatedForwarder implements DeprecatedForwarder {
+        @Override
+        public <K, V> void forward(final K key, final V value, final int childIndex) {
+            throw new UnsupportedOperationException("Forwarding by index was deprecated in 2.0 and is not supported by this ProcessorContext.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static org.apache.kafka.streams.processor.ProcessorContext adapt(final ProcessorContext<?, ?> delegate,
+                                                                            final DeprecatedForwarder deprecatedForwarder) {
         if (delegate instanceof ProcessorContextAdapter) {
-            return ((ProcessorContextAdapter<Object, Object>) delegate).delegate();
+            return ((ProcessorContextAdapter<?, ?>) delegate).delegate();

Review comment:
       If we're adapting an adapter, just unwrap it.

##########
File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
##########
@@ -61,7 +61,7 @@ public void test() {
         context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L);
 
         // finally, we can verify the output.
-        final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
+        final Iterator<MockProcessorContext.CapturedForward<String, String>> capturedForwards = context.forwarded().iterator();

Review comment:
       Since the new ProcessorContext bounds the forward calls, I've also added types to the captured forwards.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
##########
@@ -18,35 +18,52 @@
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
 import java.time.Duration;
 import java.util.Map;
 
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
-    private final InternalApiProcessorContext<Object, Object> delegate;
+public final class ProcessorContextReverseAdapter implements org.apache.kafka.streams.processor.ProcessorContext {
+    private final ProcessorContext<Object, Object> delegate;
+    private final DeprecatedForwarder deprecatedForwarder;
 
-    static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
+    public interface DeprecatedForwarder {
+        <K, V> void forward(final K key, final V value, final int childIndex);
+    }

Review comment:
       Awkwardly, we have no way to adapt the old (deprecated) `forward(final K key, final V value, final int childIndex)`, since it doesn't exist on the new API. If we want to, we could add it, but I don't think we want to.
   
   Another option is to just take this moment to drop it from `processor.ProcessorContext`. It was deprecated in 2.0.0, quite a long time ago.
   
   As it is, I took the most conservative approach I could. We will throw an UnsupportedOperationException only as a last resort.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -45,6 +47,27 @@
      */
     String name();
 
+    /**
+     * Initializes this state store.
+     * <p>
+     * The implementation of this function must register the root store in the context via the
+     * {@link org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, StateRestoreCallback)} function,
+     * where the first {@link StateStore} parameter should always be the passed-in {@code root} object, and
+     * the second parameter should be an object of user's implementation
+     * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
+     * <p>
+     * Note that if the state store engine itself supports bulk writes, users can implement another
+     * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
+     * let users implement bulk-load restoration logic instead of restoring one record at a time.
+     * <p>
+     * This method is not called if {@link StateStore#init(ProcessorContext, StateStore)}
+     * is implemented.
+     *
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);

Review comment:
       Not sure why the diff happened this way, but this is the existing `init` method. I just qualified the reference to `processor.ProcessorContext` and added the note to the javadoc: `This method is not called if {@link StateStore#init(ProcessorContext, StateStore)} is implemented.`
   
   Once the old `processor.ProcessorContext` is deprecated, this method will also become deprecated, pushing store implementers to use the other one instead.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
##########
@@ -47,7 +47,7 @@ private ProcessorAdapter(final org.apache.kafka.streams.processor.Processor<KIn,
     @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext<KOut, VOut> context) {
-        delegate.init(ProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context));
+        delegate.init(InternalProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context));

Review comment:
       renamed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
##########
@@ -18,35 +18,52 @@
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
 import java.time.Duration;
 import java.util.Map;
 
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
-    private final InternalApiProcessorContext<Object, Object> delegate;
+public final class ProcessorContextReverseAdapter implements org.apache.kafka.streams.processor.ProcessorContext {
+    private final ProcessorContext<Object, Object> delegate;
+    private final DeprecatedForwarder deprecatedForwarder;
 
-    static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
+    public interface DeprecatedForwarder {
+        <K, V> void forward(final K key, final V value, final int childIndex);
+    }
+
+    public static final class UnsupportedDeprecatedForwarder implements DeprecatedForwarder {
+        @Override
+        public <K, V> void forward(final K key, final V value, final int childIndex) {
+            throw new UnsupportedOperationException("Forwarding by index was deprecated in 2.0 and is not supported by this ProcessorContext.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static org.apache.kafka.streams.processor.ProcessorContext adapt(final ProcessorContext<?, ?> delegate,
+                                                                            final DeprecatedForwarder deprecatedForwarder) {
         if (delegate instanceof ProcessorContextAdapter) {
-            return ((ProcessorContextAdapter<Object, Object>) delegate).delegate();
+            return ((ProcessorContextAdapter<?, ?>) delegate).delegate();
+        } else if (delegate instanceof InternalApiProcessorContext) {
+            return InternalProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) delegate);

Review comment:
       If what we're adapting is actually an InternalProcessorContext, good news! We'll just use the other adapter instead.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
##########
@@ -18,35 +18,52 @@
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
 import java.time.Duration;
 import java.util.Map;
 
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
-    private final InternalApiProcessorContext<Object, Object> delegate;
+public final class ProcessorContextReverseAdapter implements org.apache.kafka.streams.processor.ProcessorContext {
+    private final ProcessorContext<Object, Object> delegate;
+    private final DeprecatedForwarder deprecatedForwarder;
 
-    static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
+    public interface DeprecatedForwarder {
+        <K, V> void forward(final K key, final V value, final int childIndex);
+    }
+
+    public static final class UnsupportedDeprecatedForwarder implements DeprecatedForwarder {
+        @Override
+        public <K, V> void forward(final K key, final V value, final int childIndex) {
+            throw new UnsupportedOperationException("Forwarding by index was deprecated in 2.0 and is not supported by this ProcessorContext.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static org.apache.kafka.streams.processor.ProcessorContext adapt(final ProcessorContext<?, ?> delegate,
+                                                                            final DeprecatedForwarder deprecatedForwarder) {
         if (delegate instanceof ProcessorContextAdapter) {
-            return ((ProcessorContextAdapter<Object, Object>) delegate).delegate();
+            return ((ProcessorContextAdapter<?, ?>) delegate).delegate();
+        } else if (delegate instanceof InternalApiProcessorContext) {
+            return InternalProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) delegate);
         } else {
-            return new ProcessorContextReverseAdapter(delegate);
+            return new ProcessorContextReverseAdapter(delegate, deprecatedForwarder);

Review comment:
       Last resort: what we're adapting really is a plain `ProcessorContext`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
##########
@@ -193,7 +130,7 @@ public Cancellable schedule(final Duration interval, final PunctuationType type,
     @Deprecated
     @Override
     public <K, V> void forward(final K key, final V value, final int childIndex) {
-        delegate.forward(key, value, To.child((currentNode().children()).get(childIndex).name()));
+        deprecatedForwarder.forward(key, value, childIndex);

Review comment:
       Here's where we call the defined behavior instead of delegating to the nonexistent method on the delegate.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
##########
@@ -142,7 +143,7 @@ public void shouldNotAllowToSchedulePunctuations() {
     public void shouldNotAllowInitForKeyValueStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);

Review comment:
       casting `null` to resolve the right overload.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
+    // Immutable fields ================================================
+    private final StreamsMetricsImpl metrics;
+    private final TaskId taskId;
+    private final StreamsConfig config;
+    private final File stateDir;
+
+    // settable record metadata ================================================
+    private String topic;
+    private Integer partition;
+    private Long offset;
+    private Headers headers;
+    private Long timestamp;
+
+    // mocks ================================================
+    private final Map<String, StateStore> stateStores = new HashMap<>();
+    private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+    private final List<CapturedForward<KForward, VForward>> capturedForwards = new LinkedList<>();

Review comment:
       Note, we're capturing typed values now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org